From 22d9448f85c94339ef5d49d250b2fdefba9bedef Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 15:16:39 -0800 Subject: [PATCH 01/16] Add SessionState template --- .../org/apache/spark/sql/SQLContext.scala | 4 +- .../org/apache/spark/sql/SessionState.scala | 79 +++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1c24d9e4aeb0a..f56c273b090cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkException} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD @@ -68,7 +68,7 @@ class SQLContext private[sql]( @transient protected[sql] val cacheManager: CacheManager, @transient private[sql] val listener: SQLListener, val isRootContext: Boolean) - extends org.apache.spark.Logging with Serializable { + extends Logging with Serializable { self => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala new file mode 100644 index 0000000000000..891921b4bd0cf --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.ParserInterface +import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog} +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource} +import org.apache.spark.sql.execution.exchange.EnsureRequirements +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.ExecutionListenerManager + + +/** + * A class that holds all session-specific state in a given [[SQLContext]]. + */ +private[sql] class SessionState(ctx: SQLContext) { + + // TODO: add comments everywhere + + val conf = new SQLConf + + val catalog: Catalog = new SimpleCatalog(conf) + + val listenerManager: ExecutionListenerManager = new ExecutionListenerManager + + val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx) + + val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() + + val udf: UDFRegistration = new UDFRegistration(ctx) + + val analyzer: Analyzer = { + new Analyzer(catalog, functionRegistry, conf) { + override val extendedResolutionRules = + python.ExtractPythonUDFs :: + PreInsertCastAndRename :: + (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) + + override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog)) + } + } + + val optimizer: Optimizer = new SparkOptimizer(ctx) + + val sqlParser: ParserInterface = new SparkQl(conf) + + val planner: SparkPlanner = new SparkPlanner(ctx) + + /** + * Prepares a planned SparkPlan for execution by inserting shuffle operations and internal + * row format conversions as needed. + */ + val prepareForExecution = new RuleExecutor[SparkPlan] { + val batches = Seq( + Batch("Subquery", Once, PlanSubqueries(ctx)), + Batch("Add exchange", Once, EnsureRequirements(ctx)), + Batch("Whole stage codegen", Once, CollapseCodegenStages(ctx)) + ) + } + +} From 5589ee5768449be05bb3e6337cd1b89c06c65228 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 16:52:43 -0800 Subject: [PATCH 02/16] Move fields to SessionState for SQL/HiveContext --- .../org/apache/spark/sql/SQLContext.scala | 68 +++++------- .../org/apache/spark/sql/SessionState.scala | 16 +-- .../spark/sql/test/TestSQLContext.scala | 21 ++-- .../apache/spark/sql/hive/HiveContext.scala | 87 +++------------ .../spark/sql/hive/HiveSessionState.scala | 100 ++++++++++++++++++ .../hive/execution/CreateTableAsSelect.scala | 6 +- .../hive/execution/CreateViewAsSelect.scala | 6 +- .../hive/execution/InsertIntoHiveTable.scala | 2 +- .../spark/sql/hive/execution/commands.scala | 14 +-- .../apache/spark/sql/hive/test/TestHive.scala | 6 +- 10 files changed, 171 insertions(+), 155 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f56c273b090cb..1405547065406 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -40,7 +40,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SQLConfEntry @@ -114,10 +113,16 @@ class SQLContext private[sql]( isRootContext = false) } + /** + * Blargh blargh blargh track the session state of this context blargh. + */ + @transient + protected[sql] val sessionState: SessionState = new SessionState(self) + /** * @return Spark SQL configuration */ - protected[sql] lazy val conf = new SQLConf + protected[sql] def conf: SQLConf = sessionState.conf /** * Set Spark SQL configuration properties. @@ -179,35 +184,28 @@ class SQLContext private[sql]( */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs - @transient - lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager + /** + * Blargh blargh I listen to punk rock blargh! + */ + def listenerManager: ExecutionListenerManager = sessionState.listenerManager - protected[sql] lazy val continuousQueryManager = new ContinuousQueryManager(this) + protected[sql] def continuousQueryManager = sessionState.continuousQueryManager - @transient - protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf) + protected[sql] def catalog: Catalog = sessionState.catalog - @transient - protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() + protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry - @transient - protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, conf) { - override val extendedResolutionRules = - python.ExtractPythonUDFs :: - PreInsertCastAndRename :: - (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) - - override val extendedCheckRules = Seq( - datasources.PreWriteCheck(catalog) - ) - } + protected[sql] def analyzer: Analyzer = sessionState.analyzer - @transient - protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this) + protected[sql] def optimizer: Optimizer = sessionState.optimizer - @transient - protected[sql] val sqlParser: ParserInterface = new SparkQl(conf) + protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser + + protected[sql] def planner: sparkexecution.SparkPlanner = sessionState.planner + + protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] = { + sessionState.prepareForExecution + } protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql) @@ -299,10 +297,8 @@ class SQLContext private[sql]( * * @group basic * @since 1.3.0 - * TODO move to SQLSession? */ - @transient - val udf: UDFRegistration = new UDFRegistration(this) + def udf: UDFRegistration = sessionState.udf /** * Returns true if the table is currently cached in-memory. @@ -872,25 +868,9 @@ class SQLContext private[sql]( }.toArray } - @transient - protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this) - @transient protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1) - /** - * Prepares a planned SparkPlan for execution by inserting shuffle operations and internal - * row format conversions as needed. - */ - @transient - protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { - val batches = Seq( - Batch("Subquery", Once, PlanSubqueries(self)), - Batch("Add exchange", Once, EnsureRequirements(self)), - Batch("Whole stage codegen", Once, CollapseCodegenStages(self)) - ) - } - /** * Parses the data type in our internal string representation. The data type string should * have the same format as the one generated by `toString` in scala. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala index 891921b4bd0cf..eaf9f2fd90eef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala @@ -35,15 +35,11 @@ private[sql] class SessionState(ctx: SQLContext) { // TODO: add comments everywhere - val conf = new SQLConf + lazy val conf = new SQLConf - val catalog: Catalog = new SimpleCatalog(conf) + lazy val catalog: Catalog = new SimpleCatalog(conf) - val listenerManager: ExecutionListenerManager = new ExecutionListenerManager - - val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx) - - val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() + lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() val udf: UDFRegistration = new UDFRegistration(ctx) @@ -69,11 +65,15 @@ private[sql] class SessionState(ctx: SQLContext) { * row format conversions as needed. */ val prepareForExecution = new RuleExecutor[SparkPlan] { - val batches = Seq( + override val batches: Seq[Batch] = Seq( Batch("Subquery", Once, PlanSubqueries(ctx)), Batch("Add exchange", Once, EnsureRequirements(ctx)), Batch("Whole stage codegen", Once, CollapseCodegenStages(ctx)) ) } + val listenerManager: ExecutionListenerManager = new ExecutionListenerManager + + val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx) + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 28ad7ae64a517..72fea162b6a4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.test import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SessionState, SQLContext} import org.apache.spark.sql.internal.SQLConf /** @@ -31,16 +31,15 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel new SparkConf().set("spark.sql.testkey", "true"))) } - protected[sql] override lazy val conf: SQLConf = new SQLConf { - - clear() - - override def clear(): Unit = { - super.clear() - - // Make sure we start with the default test configs even after clear - TestSQLContext.overrideConfs.foreach { - case (key, value) => setConfString(key, value) + protected[sql] override def sessionState: SessionState = new SessionState(self) { + override lazy val conf: SQLConf = { + new SQLConf { + clear() + override def clear(): Unit = { + super.clear() + // Make sure we start with the default test configs even after clear + TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) } + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d511dd685ce75..cfbd121039e14 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -40,17 +40,15 @@ import org.apache.hadoop.util.VersionInfo import org.apache.spark.{Logging, SparkContext} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{InternalRow, ParserInterface} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} +import org.apache.spark.sql.catalyst.expressions.LeafExpression import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck, ResolveDataSource} import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SQLConfEntry import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._ import org.apache.spark.sql.types._ @@ -110,6 +108,12 @@ class HiveContext private[hive]( isRootContext = false) } + private val hiveSessionState = new HiveSessionState(self) + + protected[sql] override val sessionState = hiveSessionState + + protected[hive] val hcatalog = hiveSessionState.metastoreCatalog + /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive @@ -335,12 +339,12 @@ class HiveContext private[hive]( */ def refreshTable(tableName: String): Unit = { val tableIdent = sqlParser.parseTableIdentifier(tableName) - catalog.refreshTable(tableIdent) + hcatalog.refreshTable(tableIdent) } protected[hive] def invalidateTable(tableName: String): Unit = { val tableIdent = sqlParser.parseTableIdentifier(tableName) - catalog.invalidateTable(tableIdent) + hcatalog.invalidateTable(tableIdent) } /** @@ -354,7 +358,7 @@ class HiveContext private[hive]( */ def analyze(tableName: String) { val tableIdent = sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(catalog.lookupRelation(tableIdent)) + val relation = EliminateSubqueryAliases(hcatalog.lookupRelation(tableIdent)) relation match { case relation: MetastoreRelation => @@ -415,7 +419,7 @@ class HiveContext private[hive]( // recorded in the Hive metastore. // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - catalog.client.alterTable( + hcatalog.client.alterTable( relation.table.copy( properties = relation.table.properties + (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) @@ -442,39 +446,6 @@ class HiveContext private[hive]( setConf(entry.key, entry.stringConverter(value)) } - /* A catalyst metadata catalog that points to the Hive Metastore. */ - @transient - override protected[sql] lazy val catalog = - new HiveMetastoreCatalog(metadataHive, this) with OverrideCatalog - - // Note that HiveUDFs will be overridden by functions registered in this context. - @transient - override protected[sql] lazy val functionRegistry: FunctionRegistry = - new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), this.executionHive) - - // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer - // can't access the SessionState of metadataHive. - functionRegistry.registerFunction( - "current_database", - (expressions: Seq[Expression]) => new CurrentDatabase(this)) - - /* An analyzer that uses the Hive metastore. */ - @transient - override protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, conf) { - override val extendedResolutionRules = - catalog.ParquetConversions :: - catalog.CreateTables :: - catalog.PreInsertionCasts :: - python.ExtractPythonUDFs :: - PreInsertCastAndRename :: - (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) - - override val extendedCheckRules = Seq( - PreWriteCheck(catalog) - ) - } - /** Overridden by child classes that need to set configuration before the client init. */ protected def configure(): Map[String, String] = { // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch @@ -544,37 +515,6 @@ class HiveContext private[hive]( c } - protected[sql] override lazy val conf: SQLConf = new SQLConf { - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - } - - @transient - protected[sql] override val sqlParser: ParserInterface = new HiveQl(conf) - - @transient - private val hivePlanner = new SparkPlanner(this) with HiveStrategies { - val hiveContext = self - - override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( - DataSourceStrategy, - HiveCommandStrategy(self), - HiveDDLStrategy, - DDLStrategy, - SpecialLimits, - InMemoryScans, - HiveTableScans, - DataSinks, - Scripts, - Aggregation, - LeftSemiJoin, - EquiJoinSelection, - BasicOperators, - BroadcastNestedLoop, - CartesianProduct, - DefaultJoin - ) - } - private def functionOrMacroDDLPattern(command: String) = Pattern.compile( ".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL).matcher(command) @@ -590,9 +530,6 @@ class HiveContext private[hive]( } } - @transient - override protected[sql] val planner = hivePlanner - /** Extends QueryExecution with hive specific features. */ protected[sql] class QueryExecution(logicalPlan: LogicalPlan) extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala new file mode 100644 index 0000000000000..eeb979463ca64 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.ParserInterface +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, Catalog, OverrideCatalog} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.{python, SparkPlanner} +import org.apache.spark.sql.execution.datasources.{PreWriteCheck, ResolveDataSource, PreInsertCastAndRename, DataSourceStrategy} +import org.apache.spark.sql.catalyst.expressions.Expression + + +/** + * blargh. + */ +private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) { + + // TODO: add all the comments. ALL OF THEM. + + val metastoreCatalog: HiveMetastoreCatalog = { + new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog + } + + override lazy val conf: SQLConf = new SQLConf { + override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) + } + + override lazy val catalog: Catalog = metastoreCatalog + + override lazy val functionRegistry: FunctionRegistry = { + // Note that HiveUDFs will be overridden by functions registered in this context. + val registry = new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive) + // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer + // can't access the SessionState of metadataHive. + registry.registerFunction( + "current_database", (expressions: Seq[Expression]) => new CurrentDatabase(ctx)) + registry + } + + /* An analyzer that uses the Hive metastore. */ + override val analyzer: Analyzer = { + new Analyzer(metastoreCatalog, functionRegistry, conf) { + override val extendedResolutionRules = + metastoreCatalog.ParquetConversions :: + metastoreCatalog.CreateTables :: + metastoreCatalog.PreInsertionCasts :: + python.ExtractPythonUDFs :: + PreInsertCastAndRename :: + (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) + + override val extendedCheckRules = Seq(PreWriteCheck(catalog)) + } + } + + override val sqlParser: ParserInterface = new HiveQl(conf) + + override val planner: SparkPlanner = { + new SparkPlanner(ctx) with HiveStrategies { + override val hiveContext = ctx + + override def strategies: Seq[Strategy] = { + ctx.experimental.extraStrategies ++ Seq( + DataSourceStrategy, + HiveCommandStrategy(ctx), + HiveDDLStrategy, + DDLStrategy, + SpecialLimits, + InMemoryScans, + HiveTableScans, + DataSinks, + Scripts, + Aggregation, + LeftSemiJoin, + EquiJoinSelection, + BasicOperators, + BroadcastNestedLoop, + CartesianProduct, + DefaultJoin + ) + } + } + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 3f81c99c41e14..3858ec18c0762 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -69,17 +69,17 @@ case class CreateTableAsSelect( withFormat } - hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false) + hiveContext.hcatalog.client.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation - hiveContext.catalog.lookupRelation(tableIdentifier, None) match { + hiveContext.hcatalog.lookupRelation(tableIdentifier, None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.catalog.tableExists(tableIdentifier)) { + if (hiveContext.hcatalog.tableExists(tableIdentifier)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 2914d03749321..75979402673d1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -49,14 +49,14 @@ private[hive] case class CreateViewAsSelect( override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.catalog.tableExists(tableIdentifier) match { + hiveContext.hcatalog.tableExists(tableIdentifier) match { case true if allowExisting => // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view // already exists. case true if orReplace => // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - hiveContext.catalog.client.alertView(prepareTable(sqlContext)) + hiveContext.hcatalog.client.alertView(prepareTable(sqlContext)) case true => // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already @@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect( "CREATE OR REPLACE VIEW AS") case false => - hiveContext.catalog.client.createView(prepareTable(sqlContext)) + hiveContext.hcatalog.client.createView(prepareTable(sqlContext)) } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 145b5f7cc2dc2..b4ada96d7a013 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -46,7 +46,7 @@ case class InsertIntoHiveTable( @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient private lazy val hiveContext = new Context(sc.hiveconf) - @transient private lazy val catalog = sc.catalog + @transient private lazy val catalog = sc.hcatalog def output: Seq[Attribute] = Seq.empty diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 246b52a3b01d8..820e9da3fdaae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -71,7 +71,7 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.catalog.unregisterTable(TableIdentifier(tableName)) + hiveContext.hcatalog.unregisterTable(TableIdentifier(tableName)) Seq.empty[Row] } } @@ -130,7 +130,7 @@ case class CreateMetastoreDataSource( val tableName = tableIdent.unquotedString val hiveContext = sqlContext.asInstanceOf[HiveContext] - if (hiveContext.catalog.tableExists(tableIdent)) { + if (hiveContext.hcatalog.tableExists(tableIdent)) { if (allowExisting) { return Seq.empty[Row] } else { @@ -142,12 +142,12 @@ case class CreateMetastoreDataSource( val optionsWithPath = if (!options.contains("path") && managedIfNoPath) { isExternal = false - options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.hcatalog.hiveDefaultTableFilePath(tableIdent)) } else { options } - hiveContext.catalog.createDataSourceTable( + hiveContext.hcatalog.createDataSourceTable( tableIdent, userSpecifiedSchema, Array.empty[String], @@ -192,7 +192,7 @@ case class CreateMetastoreDataSourceAsSelect( val optionsWithPath = if (!options.contains("path")) { isExternal = false - options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.hcatalog.hiveDefaultTableFilePath(tableIdent)) } else { options } @@ -274,7 +274,7 @@ case class CreateMetastoreDataSourceAsSelect( // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - hiveContext.catalog.createDataSourceTable( + hiveContext.hcatalog.createDataSourceTable( tableIdent, Some(resolved.relation.schema), partitionColumns, @@ -285,7 +285,7 @@ case class CreateMetastoreDataSourceAsSelect( } // Refresh the cache of the table in the catalog. - hiveContext.catalog.refreshTable(tableIdent) + hiveContext.hcatalog.refreshTable(tableIdent) Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 9d0622bea92c7..2ee8c24e41564 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -420,9 +420,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { cacheManager.clearCache() loadedTables.clear() - catalog.cachedDataSourceTables.invalidateAll() - catalog.client.reset() - catalog.unregisterAllTables() + hcatalog.cachedDataSourceTables.invalidateAll() + hcatalog.client.reset() + hcatalog.unregisterAllTables() FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } From 814ae1d5ccbb6680e808b280de9c454f76113f28 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 17:41:11 -0800 Subject: [PATCH 03/16] Clean up code + add/restore comments --- .../org/apache/spark/sql/SQLContext.scala | 49 +++++++------------ .../org/apache/spark/sql/SessionState.scala | 37 ++++++++++++-- .../apache/spark/sql/UDFRegistration.scala | 5 +- .../spark/sql/hive/HiveSessionState.scala | 37 +++++++++----- 4 files changed, 76 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1405547065406..0b1a6cc1f61dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -30,8 +30,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} -import org.apache.spark.sql.{execution => sparkexecution} -import org.apache.spark.sql.catalyst.{InternalRow, _} +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ @@ -114,15 +113,26 @@ class SQLContext private[sql]( } /** - * Blargh blargh blargh track the session state of this context blargh. + * Per-session state, e.g. configuration, functions, temporary tables etc. */ @transient protected[sql] val sessionState: SessionState = new SessionState(self) + protected[sql] def conf: SQLConf = sessionState.conf + protected[sql] def catalog: Catalog = sessionState.catalog + protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry + protected[sql] def analyzer: Analyzer = sessionState.analyzer + protected[sql] def optimizer: Optimizer = sessionState.optimizer + protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser + protected[sql] def planner: SparkPlanner = sessionState.planner + protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] = sessionState.prepareForExecution + protected[sql] def continuousQueryManager = sessionState.continuousQueryManager /** - * @return Spark SQL configuration + * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s + * that listen for execution metrics. */ - protected[sql] def conf: SQLConf = sessionState.conf + @Experimental + def listenerManager: ExecutionListenerManager = sessionState.listenerManager /** * Set Spark SQL configuration properties. @@ -184,36 +194,11 @@ class SQLContext private[sql]( */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs - /** - * Blargh blargh I listen to punk rock blargh! - */ - def listenerManager: ExecutionListenerManager = sessionState.listenerManager - - protected[sql] def continuousQueryManager = sessionState.continuousQueryManager - - protected[sql] def catalog: Catalog = sessionState.catalog - - protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry - - protected[sql] def analyzer: Analyzer = sessionState.analyzer - - protected[sql] def optimizer: Optimizer = sessionState.optimizer - - protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser - - protected[sql] def planner: sparkexecution.SparkPlanner = sessionState.planner - - protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] = { - sessionState.prepareForExecution - } - protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql) - protected[sql] def executeSql(sql: String): - org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql)) + protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = - new sparkexecution.QueryExecution(this, plan) + protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan) /** * Add a jar to SQLContext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala index eaf9f2fd90eef..5a19c3e5d1aac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala @@ -33,16 +33,29 @@ import org.apache.spark.sql.util.ExecutionListenerManager */ private[sql] class SessionState(ctx: SQLContext) { - // TODO: add comments everywhere - + /** + * SQL-specific key-value configurations. + */ lazy val conf = new SQLConf + /** + * Internal catalog for managing table and database states. + */ lazy val catalog: Catalog = new SimpleCatalog(conf) + /** + * Internal catalog for managing functions registered by the user. + */ lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() - val udf: UDFRegistration = new UDFRegistration(ctx) + /** + * Interface exposed to the user for registering user-defined functions. + */ + val udf: UDFRegistration = new UDFRegistration(functionRegistry) + /** + * Logical query plan analyzer for resolving unresolved attributes and relations. + */ val analyzer: Analyzer = { new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = @@ -54,14 +67,23 @@ private[sql] class SessionState(ctx: SQLContext) { } } + /** + * Logical query plan optimizer. + */ val optimizer: Optimizer = new SparkOptimizer(ctx) + /** + * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. + */ val sqlParser: ParserInterface = new SparkQl(conf) + /** + * Planner that converts optimized logical plans to physical plans. + */ val planner: SparkPlanner = new SparkPlanner(ctx) /** - * Prepares a planned SparkPlan for execution by inserting shuffle operations and internal + * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal * row format conversions as needed. */ val prepareForExecution = new RuleExecutor[SparkPlan] { @@ -72,8 +94,15 @@ private[sql] class SessionState(ctx: SQLContext) { ) } + /** + * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s + * that listen for execution metrics. + */ val listenerManager: ExecutionListenerManager = new ExecutionListenerManager + /** + * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s. + */ val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index de01cbcb0e1b3..d894825632a82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -22,6 +22,7 @@ import scala.util.Try import org.apache.spark.Logging import org.apache.spark.sql.api.java._ +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF} import org.apache.spark.sql.execution.aggregate.ScalaUDAF @@ -34,9 +35,7 @@ import org.apache.spark.sql.types.DataType * * @since 1.3.0 */ -class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { - - private val functionRegistry = sqlContext.functionRegistry +class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends Logging { protected[sql] def registerPython(name: String, udf: UserDefinedPythonFunction): Unit = { log.debug( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index eeb979463ca64..57d740fa555ce 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -19,20 +19,21 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ParserInterface -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, Catalog, OverrideCatalog} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.execution.{python, SparkPlanner} -import org.apache.spark.sql.execution.datasources.{PreWriteCheck, ResolveDataSource, PreInsertCastAndRename, DataSourceStrategy} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.{python, SparkPlanner} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf /** - * blargh. + * A class that holds all session-specific state in a given [[HiveContext]]. */ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) { - // TODO: add all the comments. ALL OF THEM. - + /** + * A metadata catalog that points to the Hive metastore. + */ val metastoreCatalog: HiveMetastoreCatalog = { new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog } @@ -43,17 +44,21 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) override lazy val catalog: Catalog = metastoreCatalog + /** + * Internal catalog for managing functions registered by the user. + * Note that HiveUDFs will be overridden by functions registered in this context. + */ override lazy val functionRegistry: FunctionRegistry = { - // Note that HiveUDFs will be overridden by functions registered in this context. val registry = new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive) - // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer - // can't access the SessionState of metadataHive. - registry.registerFunction( - "current_database", (expressions: Seq[Expression]) => new CurrentDatabase(ctx)) + // The Hive UDF current_database() is foldable, will be evaluated by optimizer, + // but the optimizer can't access the SessionState of metadataHive. + registry.registerFunction("current_database", (e: Seq[Expression]) => new CurrentDatabase(ctx)) registry } - /* An analyzer that uses the Hive metastore. */ + /** + * An analyzer that uses the Hive metastore. + */ override val analyzer: Analyzer = { new Analyzer(metastoreCatalog, functionRegistry, conf) { override val extendedResolutionRules = @@ -68,8 +73,14 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) } } + /** + * Parser for HiveQl query texts. + */ override val sqlParser: ParserInterface = new HiveQl(conf) + /** + * Planner that takes into account Hive-specific strategies. + */ override val planner: SparkPlanner = { new SparkPlanner(ctx) with HiveStrategies { override val hiveContext = ctx From 671b22694a171fcb95455b6762104250629db069 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 18:50:10 -0800 Subject: [PATCH 04/16] Fix MiMa --- project/MimaExcludes.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 14e3c90f1b0d2..353e06bc4abe5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -272,9 +272,13 @@ object MimaExcludes { // SPARK-13220 Deprecate yarn-client and yarn-cluster mode ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler") - ) ++ Seq ( + ) ++ Seq( // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this") + ) ++ Seq( + // SPARK-13526 Move SQLContext per-session states to new class + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "org.apache.spark.sql.UDFRegistration.this") ) ++ Seq( // [SPARK-13486][SQL] Move SQLConf into an internal package ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf"), From 0e0b464d15177a477b59adbd6c83ed5a5d03a136 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 18:51:04 -0800 Subject: [PATCH 05/16] Move SessionState to internal package --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../org/apache/spark/sql/{ => internal}/SessionState.scala | 4 ++-- .../test/scala/org/apache/spark/sql/test/TestSQLContext.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/HiveSessionState.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/{ => internal}/SessionState.scala (96%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0b1a6cc1f61dc..e1ed078c82e2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.internal.SQLConf.SQLConfEntry import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala similarity index 96% rename from sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala rename to sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 5a19c3e5d1aac..a24033dcf8b9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.internal import org.apache.spark.sql.catalyst.ParserInterface import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog} @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource} import org.apache.spark.sql.execution.exchange.EnsureRequirements -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.ExecutionListenerManager +import org.apache.spark.sql.{ContinuousQueryManager, UDFRegistration, SQLContext} /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 72fea162b6a4a..3a8b10cdc0b4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.test import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{SessionState, SQLContext} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.internal.{SessionState, SQLConf} /** * A special [[SQLContext]] prepared for testing. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 57d740fa555ce..8c66b43e53d7d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegist import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.{python, SparkPlanner} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SessionState, SQLConf} /** From fe0f792c5483887cbc777bd518a93c99cb33d2dd Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 18:51:34 -0800 Subject: [PATCH 06/16] hcatalog -> hiveCatalog --- .../org/apache/spark/sql/hive/HiveContext.scala | 10 +++++----- .../sql/hive/execution/CreateTableAsSelect.scala | 6 +++--- .../sql/hive/execution/CreateViewAsSelect.scala | 6 +++--- .../sql/hive/execution/InsertIntoHiveTable.scala | 2 +- .../apache/spark/sql/hive/execution/commands.scala | 14 +++++++------- .../org/apache/spark/sql/hive/test/TestHive.scala | 6 +++--- 6 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index cfbd121039e14..472e5462ff029 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -112,7 +112,7 @@ class HiveContext private[hive]( protected[sql] override val sessionState = hiveSessionState - protected[hive] val hcatalog = hiveSessionState.metastoreCatalog + protected[hive] val hiveCatalog = hiveSessionState.metastoreCatalog /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe @@ -339,12 +339,12 @@ class HiveContext private[hive]( */ def refreshTable(tableName: String): Unit = { val tableIdent = sqlParser.parseTableIdentifier(tableName) - hcatalog.refreshTable(tableIdent) + hiveCatalog.refreshTable(tableIdent) } protected[hive] def invalidateTable(tableName: String): Unit = { val tableIdent = sqlParser.parseTableIdentifier(tableName) - hcatalog.invalidateTable(tableIdent) + hiveCatalog.invalidateTable(tableIdent) } /** @@ -358,7 +358,7 @@ class HiveContext private[hive]( */ def analyze(tableName: String) { val tableIdent = sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(hcatalog.lookupRelation(tableIdent)) + val relation = EliminateSubqueryAliases(hiveCatalog.lookupRelation(tableIdent)) relation match { case relation: MetastoreRelation => @@ -419,7 +419,7 @@ class HiveContext private[hive]( // recorded in the Hive metastore. // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - hcatalog.client.alterTable( + hiveCatalog.client.alterTable( relation.table.copy( properties = relation.table.properties + (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 3858ec18c0762..2e7de1662261f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -69,17 +69,17 @@ case class CreateTableAsSelect( withFormat } - hiveContext.hcatalog.client.createTable(withSchema, ignoreIfExists = false) + hiveContext.hiveCatalog.client.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation - hiveContext.hcatalog.lookupRelation(tableIdentifier, None) match { + hiveContext.hiveCatalog.lookupRelation(tableIdentifier, None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.hcatalog.tableExists(tableIdentifier)) { + if (hiveContext.hiveCatalog.tableExists(tableIdentifier)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 75979402673d1..b530fb8d1932c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -49,14 +49,14 @@ private[hive] case class CreateViewAsSelect( override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.hcatalog.tableExists(tableIdentifier) match { + hiveContext.hiveCatalog.tableExists(tableIdentifier) match { case true if allowExisting => // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view // already exists. case true if orReplace => // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - hiveContext.hcatalog.client.alertView(prepareTable(sqlContext)) + hiveContext.hiveCatalog.client.alertView(prepareTable(sqlContext)) case true => // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already @@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect( "CREATE OR REPLACE VIEW AS") case false => - hiveContext.hcatalog.client.createView(prepareTable(sqlContext)) + hiveContext.hiveCatalog.client.createView(prepareTable(sqlContext)) } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b4ada96d7a013..7f404800bbe5f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -46,7 +46,7 @@ case class InsertIntoHiveTable( @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient private lazy val hiveContext = new Context(sc.hiveconf) - @transient private lazy val catalog = sc.hcatalog + @transient private lazy val catalog = sc.hiveCatalog def output: Seq[Attribute] = Seq.empty diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 820e9da3fdaae..1658dfd771c58 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -71,7 +71,7 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.hcatalog.unregisterTable(TableIdentifier(tableName)) + hiveContext.hiveCatalog.unregisterTable(TableIdentifier(tableName)) Seq.empty[Row] } } @@ -130,7 +130,7 @@ case class CreateMetastoreDataSource( val tableName = tableIdent.unquotedString val hiveContext = sqlContext.asInstanceOf[HiveContext] - if (hiveContext.hcatalog.tableExists(tableIdent)) { + if (hiveContext.hiveCatalog.tableExists(tableIdent)) { if (allowExisting) { return Seq.empty[Row] } else { @@ -142,12 +142,12 @@ case class CreateMetastoreDataSource( val optionsWithPath = if (!options.contains("path") && managedIfNoPath) { isExternal = false - options + ("path" -> hiveContext.hcatalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.hiveCatalog.hiveDefaultTableFilePath(tableIdent)) } else { options } - hiveContext.hcatalog.createDataSourceTable( + hiveContext.hiveCatalog.createDataSourceTable( tableIdent, userSpecifiedSchema, Array.empty[String], @@ -192,7 +192,7 @@ case class CreateMetastoreDataSourceAsSelect( val optionsWithPath = if (!options.contains("path")) { isExternal = false - options + ("path" -> hiveContext.hcatalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.hiveCatalog.hiveDefaultTableFilePath(tableIdent)) } else { options } @@ -274,7 +274,7 @@ case class CreateMetastoreDataSourceAsSelect( // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - hiveContext.hcatalog.createDataSourceTable( + hiveContext.hiveCatalog.createDataSourceTable( tableIdent, Some(resolved.relation.schema), partitionColumns, @@ -285,7 +285,7 @@ case class CreateMetastoreDataSourceAsSelect( } // Refresh the cache of the table in the catalog. - hiveContext.hcatalog.refreshTable(tableIdent) + hiveContext.hiveCatalog.refreshTable(tableIdent) Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 2ee8c24e41564..dd6a6849a1911 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -420,9 +420,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { cacheManager.clearCache() loadedTables.clear() - hcatalog.cachedDataSourceTables.invalidateAll() - hcatalog.client.reset() - hcatalog.unregisterAllTables() + hiveCatalog.cachedDataSourceTables.invalidateAll() + hiveCatalog.client.reset() + hiveCatalog.unregisterAllTables() FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } From 60852fffd04839d75069865fb9e5d4b5a0549f32 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 18:53:36 -0800 Subject: [PATCH 07/16] Fix style --- .../main/scala/org/apache/spark/sql/internal/SessionState.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index a24033dcf8b9c..97f1d53ddf050 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.internal +import org.apache.spark.sql.{ContinuousQueryManager, SQLContext, UDFRegistration} import org.apache.spark.sql.catalyst.ParserInterface import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog} import org.apache.spark.sql.catalyst.optimizer.Optimizer @@ -25,7 +26,6 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource} import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.util.ExecutionListenerManager -import org.apache.spark.sql.{ContinuousQueryManager, UDFRegistration, SQLContext} /** From 775f6c2d13fc388a24a4f0f3d1ee05b2e7fcac5c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 22:18:24 -0800 Subject: [PATCH 08/16] Fix test compile --- .../test/scala/org/apache/spark/sql/test/TestSQLContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 3a8b10cdc0b4b..5196c5b76ff68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -31,7 +31,7 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel new SparkConf().set("spark.sql.testkey", "true"))) } - protected[sql] override def sessionState: SessionState = new SessionState(self) { + protected[sql] override val sessionState: SessionState = new SessionState(self) { override lazy val conf: SQLConf = { new SQLConf { clear() From c9af4857807e0d0219a87292d72f8bc7a0f59f89 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 22:50:40 -0800 Subject: [PATCH 09/16] Fix test compile for reals --- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../hive/JavaMetastoreDataSourcesSuite.java | 2 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 6 ++--- .../sql/hive/MetastoreDataSourcesSuite.scala | 22 +++++++++---------- .../spark/sql/hive/MultiDatabaseSuite.scala | 5 +++-- .../apache/spark/sql/hive/parquetSuites.scala | 22 +++++++++---------- .../sql/sources/BucketedWriteSuite.scala | 2 +- 7 files changed, 31 insertions(+), 30 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 472e5462ff029..541dacde6d8ec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -112,7 +112,7 @@ class HiveContext private[hive]( protected[sql] override val sessionState = hiveSessionState - protected[hive] val hiveCatalog = hiveSessionState.metastoreCatalog + protected[sql] val hiveCatalog = hiveSessionState.metastoreCatalog /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 8c4af1b8eaf44..5a440b9fd508d 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -72,7 +72,7 @@ public void setUp() throws IOException { if (path.exists()) { path.delete(); } - hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath( + hiveManagedPath = new Path(sqlContext.hiveCatalog().hiveDefaultTableFilePath( new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 90d65d9e9b8c0..e9710909dfd2c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = catalog.client.getTable("default", "t") + val hiveTable = hiveCatalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = catalog.client.getTable("default", "t") + val hiveTable = hiveCatalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite |AS SELECT 1 AS d1, "val_1" AS d2 """.stripMargin) - val hiveTable = catalog.client.getTable("default", "t") + val hiveTable = hiveCatalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index cb23959c2dd57..8d50f189e2085 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -369,7 +369,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv |) """.stripMargin) - val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) + val expectedPath = hiveCatalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) @@ -474,7 +474,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Drop table will also delete the data. sql("DROP TABLE savedJsonTable") intercept[IOException] { - read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) + read.json(hiveCatalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) } } @@ -704,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) // Manually create a metastore data source table. - catalog.createDataSourceTable( + hiveCatalog.createDataSourceTable( tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -736,14 +736,14 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv outputFormat = None, serde = None, serdeProperties = Map( - "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) + "path" -> hiveCatalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema" -> schema.json, "EXTERNAL" -> "FALSE")) - catalog.client.createTable(hiveTable, ignoreIfExists = false) + hiveCatalog.client.createTable(hiveTable, ignoreIfExists = false) invalidateTable(tableName) val actualSchema = table(tableName).schema @@ -758,7 +758,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = catalog.client.getTable("default", tableName) + val metastoreTable = hiveCatalog.client.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt @@ -793,7 +793,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .sortBy("c") .saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = catalog.client.getTable("default", tableName) + val metastoreTable = hiveCatalog.client.getTable("default", tableName) val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) @@ -910,7 +910,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("skip hive metadata on table creation") { val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) - catalog.createDataSourceTable( + hiveCatalog.createDataSourceTable( tableIdent = TableIdentifier("not_skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -921,10 +921,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in Hive compatible format, we verify that // each column of the table is of native type StringType. - assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema + assert(hiveCatalog.client.getTable("default", "not_skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) - catalog.createDataSourceTable( + hiveCatalog.createDataSourceTable( tableIdent = TableIdentifier("skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -935,7 +935,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in SparkSQL format, we verify that // the table has a column type as array of StringType. - assert(catalog.client.getTable("default", "skip_hive_metadata").schema + assert(hiveCatalog.client.getTable("default", "skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 488f298981c86..a8ca5ed47fd85 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -25,8 +25,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private lazy val df = sqlContext.range(10).coalesce(1) private def checkTablePath(dbName: String, tableName: String): Unit = { - val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName) - val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName + val metastoreTable = hiveContext.hiveCatalog.client.getTable(dbName, tableName) + val expectedPath = + hiveContext.hiveCatalog.client.getDatabase(dbName).locationUri + "/" + tableName assert(metastoreTable.storage.serdeProperties("path") === expectedPath) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index a127cf6e4b7d4..a50eced18c177 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -425,9 +425,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } test("Caching converted data source Parquet Relations") { - def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = { + def checkCached(tableIdentifier: hiveCatalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached. - catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { + hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") case logical @ LogicalRelation(parquetRelation: ParquetRelation, _, _) => // OK case other => @@ -452,17 +452,17 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet") + var tableIdentifier = hiveCatalog.QualifiedTableName("default", "test_insert_parquet") // First, make sure the converted test_parquet is not cached. - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Table lookup will make the table cached. table("test_insert_parquet") checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. invalidateTable("test_insert_parquet") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_insert_parquet @@ -475,7 +475,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select a, b from jt").collect()) // Invalidate the cache. invalidateTable("test_insert_parquet") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Create a partitioned table. sql( @@ -492,8 +492,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + tableIdentifier = hiveCatalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") + assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -502,14 +502,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (`date`='2015-04-02') |select a, b from jt """.stripMargin) - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") @@ -525,7 +525,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin).collect()) invalidateTable("test_parquet_partitioned_cache_test") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index c37b21bed3ab0..f575c04dec2ef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -69,7 +69,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle def tableDir: File = { val identifier = hiveContext.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(hiveContext.catalog.hiveDefaultTableFilePath(identifier))) + new File(URI.create(hiveContext.hiveCatalog.hiveDefaultTableFilePath(identifier))) } /** From 8f48d2a5b310eec9ef70d9f52f3d2e1fafa50374 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 26 Feb 2016 22:57:30 -0800 Subject: [PATCH 10/16] Fix style --- .../test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index a50eced18c177..ec2452cec0b73 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -492,7 +492,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifier = hiveCatalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") + tableIdentifier = + hiveCatalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ From effe7d8c31f3e27cff762389964cbf90390c2f1c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 27 Feb 2016 00:48:18 -0800 Subject: [PATCH 11/16] Fix tests Stupid constructor initialization ordering. --- .../org/apache/spark/sql/SQLContext.scala | 8 ++--- .../spark/sql/test/TestSQLContext.scala | 2 +- .../execution/HiveCompatibilitySuite.scala | 4 +-- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 31 ++++++++++--------- 5 files changed, 25 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e1ed078c82e2e..6690e66a23139 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -116,10 +116,10 @@ class SQLContext private[sql]( * Per-session state, e.g. configuration, functions, temporary tables etc. */ @transient - protected[sql] val sessionState: SessionState = new SessionState(self) - protected[sql] def conf: SQLConf = sessionState.conf - protected[sql] def catalog: Catalog = sessionState.catalog - protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry + protected[sql] lazy val sessionState: SessionState = new SessionState(self) + protected[sql] val conf: SQLConf = sessionState.conf + protected[sql] val catalog: Catalog = sessionState.catalog + protected[sql] val functionRegistry: FunctionRegistry = sessionState.functionRegistry protected[sql] def analyzer: Analyzer = sessionState.analyzer protected[sql] def optimizer: Optimizer = sessionState.optimizer protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 5196c5b76ff68..1823844f28ed5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -31,7 +31,7 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel new SparkConf().set("spark.sql.testkey", "true"))) } - protected[sql] override val sessionState: SessionState = new SessionState(self) { + protected[sql] override lazy val sessionState: SessionState = new SessionState(self) { override lazy val conf: SQLConf = { new SQLConf { clear() diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index d15f883138938..0dc2a95eea70e 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -55,7 +55,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Enable in-memory partition pruning for testing purposes TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Use Hive hash expression instead of the native one - TestHive.functionRegistry.unregisterFunction("hash") + TestHive.sessionState.functionRegistry.unregisterFunction("hash") RuleExecutor.resetTime() } @@ -65,7 +65,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { Locale.setDefault(originalLocale) TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) - TestHive.functionRegistry.restore() + TestHive.sessionState.functionRegistry.restore() // For debugging dump some statistics about how much time was spent in various optimizer rules. logWarning(RuleExecutor.dumpTimeSpent()) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 541dacde6d8ec..283857abb8c04 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -110,7 +110,7 @@ class HiveContext private[hive]( private val hiveSessionState = new HiveSessionState(self) - protected[sql] override val sessionState = hiveSessionState + protected[sql] override lazy val sessionState = hiveSessionState protected[sql] val hiveCatalog = hiveSessionState.metastoreCatalog diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index dd6a6849a1911..340a2186d6c2f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveNativeCommand -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.util.{ShutdownHookManager, Utils} // SPARK-3729: Test key required to check for initialization errors with config. @@ -120,18 +120,24 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { override def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) - protected[sql] override lazy val conf: SQLConf = new SQLConf { - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - - clear() - - override def clear(): Unit = { - super.clear() - - TestHiveContext.overrideConfs.map { - case (key, value) => setConfString(key, value) + protected[sql] override lazy val sessionState = new HiveSessionState(this) { + override lazy val conf: SQLConf = { + new SQLConf { + clear() + override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) + override def clear(): Unit = { + super.clear() + TestHiveContext.overrideConfs.map { + case (key, value) => setConfString(key, value) + } + } } } + + override lazy val functionRegistry = { + new TestHiveFunctionRegistry( + org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), self.executionHive) + } } /** @@ -454,9 +460,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { } } - @transient - override protected[sql] lazy val functionRegistry = new TestHiveFunctionRegistry( - org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive) } private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl) From d151fd107e42697de78e251b11043f8e8e7d8cc2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 27 Feb 2016 01:04:58 -0800 Subject: [PATCH 12/16] Make diff smaller + fix Hive tests It turns out when you override a val and you don't specify the type, you can make it of a different type (one that subclasses the parent type). This makes it possible to revert some of the previous changes related to the `hiveCatalog` field. --- .../org/apache/spark/sql/SQLContext.scala | 23 +++++++-------- .../spark/sql/internal/SessionState.scala | 19 +++++++------ .../apache/spark/sql/hive/HiveContext.scala | 14 ++++------ .../spark/sql/hive/HiveSessionState.scala | 28 ++++++++----------- .../hive/execution/CreateTableAsSelect.scala | 6 ++-- .../hive/execution/CreateViewAsSelect.scala | 6 ++-- .../hive/execution/InsertIntoHiveTable.scala | 2 +- .../spark/sql/hive/execution/commands.scala | 14 +++++----- .../apache/spark/sql/hive/test/TestHive.scala | 8 +++--- .../hive/JavaMetastoreDataSourcesSuite.java | 2 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 6 ++-- .../sql/hive/MetastoreDataSourcesSuite.scala | 22 +++++++-------- .../spark/sql/hive/MultiDatabaseSuite.scala | 5 ++-- .../apache/spark/sql/hive/parquetSuites.scala | 23 ++++++++------- .../sql/sources/BucketedWriteSuite.scala | 2 +- 15 files changed, 88 insertions(+), 92 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6690e66a23139..4a3ff2b4b9d16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -117,22 +117,23 @@ class SQLContext private[sql]( */ @transient protected[sql] lazy val sessionState: SessionState = new SessionState(self) - protected[sql] val conf: SQLConf = sessionState.conf - protected[sql] val catalog: Catalog = sessionState.catalog - protected[sql] val functionRegistry: FunctionRegistry = sessionState.functionRegistry - protected[sql] def analyzer: Analyzer = sessionState.analyzer - protected[sql] def optimizer: Optimizer = sessionState.optimizer - protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser - protected[sql] def planner: SparkPlanner = sessionState.planner - protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] = sessionState.prepareForExecution - protected[sql] def continuousQueryManager = sessionState.continuousQueryManager + protected[sql] lazy val conf: SQLConf = sessionState.conf + protected[sql] lazy val catalog: Catalog = sessionState.catalog + protected[sql] lazy val functionRegistry: FunctionRegistry = sessionState.functionRegistry + protected[sql] lazy val analyzer: Analyzer = sessionState.analyzer + protected[sql] lazy val optimizer: Optimizer = sessionState.optimizer + protected[sql] lazy val sqlParser: ParserInterface = sessionState.sqlParser + protected[sql] lazy val planner: SparkPlanner = sessionState.planner + protected[sql] lazy val continuousQueryManager = sessionState.continuousQueryManager + protected[sql] lazy val prepareForExecution: RuleExecutor[SparkPlan] = + sessionState.prepareForExecution /** * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s * that listen for execution metrics. */ @Experimental - def listenerManager: ExecutionListenerManager = sessionState.listenerManager + lazy val listenerManager: ExecutionListenerManager = sessionState.listenerManager /** * Set Spark SQL configuration properties. @@ -283,7 +284,7 @@ class SQLContext private[sql]( * @group basic * @since 1.3.0 */ - def udf: UDFRegistration = sessionState.udf + lazy val udf: UDFRegistration = sessionState.udf /** * Returns true if the table is currently cached in-memory. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 97f1d53ddf050..f93a405f77fc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -33,6 +33,9 @@ import org.apache.spark.sql.util.ExecutionListenerManager */ private[sql] class SessionState(ctx: SQLContext) { + // Note: These are all lazy vals because they depend on each other (e.g. conf) and we + // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs. + /** * SQL-specific key-value configurations. */ @@ -51,12 +54,12 @@ private[sql] class SessionState(ctx: SQLContext) { /** * Interface exposed to the user for registering user-defined functions. */ - val udf: UDFRegistration = new UDFRegistration(functionRegistry) + lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry) /** * Logical query plan analyzer for resolving unresolved attributes and relations. */ - val analyzer: Analyzer = { + lazy val analyzer: Analyzer = { new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = python.ExtractPythonUDFs :: @@ -70,23 +73,23 @@ private[sql] class SessionState(ctx: SQLContext) { /** * Logical query plan optimizer. */ - val optimizer: Optimizer = new SparkOptimizer(ctx) + lazy val optimizer: Optimizer = new SparkOptimizer(ctx) /** * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. */ - val sqlParser: ParserInterface = new SparkQl(conf) + lazy val sqlParser: ParserInterface = new SparkQl(conf) /** * Planner that converts optimized logical plans to physical plans. */ - val planner: SparkPlanner = new SparkPlanner(ctx) + lazy val planner: SparkPlanner = new SparkPlanner(ctx) /** * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal * row format conversions as needed. */ - val prepareForExecution = new RuleExecutor[SparkPlan] { + lazy val prepareForExecution = new RuleExecutor[SparkPlan] { override val batches: Seq[Batch] = Seq( Batch("Subquery", Once, PlanSubqueries(ctx)), Batch("Add exchange", Once, EnsureRequirements(ctx)), @@ -98,11 +101,11 @@ private[sql] class SessionState(ctx: SQLContext) { * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s * that listen for execution metrics. */ - val listenerManager: ExecutionListenerManager = new ExecutionListenerManager + lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager /** * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s. */ - val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx) + lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 283857abb8c04..2e6f16254ae80 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -108,11 +108,9 @@ class HiveContext private[hive]( isRootContext = false) } - private val hiveSessionState = new HiveSessionState(self) + protected[sql] override lazy val sessionState = new HiveSessionState(self) - protected[sql] override lazy val sessionState = hiveSessionState - - protected[sql] val hiveCatalog = hiveSessionState.metastoreCatalog + protected[sql] override lazy val catalog = sessionState.catalog /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe @@ -339,12 +337,12 @@ class HiveContext private[hive]( */ def refreshTable(tableName: String): Unit = { val tableIdent = sqlParser.parseTableIdentifier(tableName) - hiveCatalog.refreshTable(tableIdent) + catalog.refreshTable(tableIdent) } protected[hive] def invalidateTable(tableName: String): Unit = { val tableIdent = sqlParser.parseTableIdentifier(tableName) - hiveCatalog.invalidateTable(tableIdent) + catalog.invalidateTable(tableIdent) } /** @@ -358,7 +356,7 @@ class HiveContext private[hive]( */ def analyze(tableName: String) { val tableIdent = sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(hiveCatalog.lookupRelation(tableIdent)) + val relation = EliminateSubqueryAliases(catalog.lookupRelation(tableIdent)) relation match { case relation: MetastoreRelation => @@ -419,7 +417,7 @@ class HiveContext private[hive]( // recorded in the Hive metastore. // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - hiveCatalog.client.alterTable( + catalog.client.alterTable( relation.table.copy( properties = relation.table.properties + (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 8c66b43e53d7d..886fcf23cbfc0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ParserInterface -import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, OverrideCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.{python, SparkPlanner} import org.apache.spark.sql.execution.datasources._ @@ -31,18 +31,14 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf} */ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) { - /** - * A metadata catalog that points to the Hive metastore. - */ - val metastoreCatalog: HiveMetastoreCatalog = { - new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog - } - override lazy val conf: SQLConf = new SQLConf { override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) } - override lazy val catalog: Catalog = metastoreCatalog + /** + * A metadata catalog that points to the Hive metastore. + */ + override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog /** * Internal catalog for managing functions registered by the user. @@ -59,12 +55,12 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) /** * An analyzer that uses the Hive metastore. */ - override val analyzer: Analyzer = { - new Analyzer(metastoreCatalog, functionRegistry, conf) { + override lazy val analyzer: Analyzer = { + new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = - metastoreCatalog.ParquetConversions :: - metastoreCatalog.CreateTables :: - metastoreCatalog.PreInsertionCasts :: + catalog.ParquetConversions :: + catalog.CreateTables :: + catalog.PreInsertionCasts :: python.ExtractPythonUDFs :: PreInsertCastAndRename :: (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) @@ -76,12 +72,12 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) /** * Parser for HiveQl query texts. */ - override val sqlParser: ParserInterface = new HiveQl(conf) + override lazy val sqlParser: ParserInterface = new HiveQl(conf) /** * Planner that takes into account Hive-specific strategies. */ - override val planner: SparkPlanner = { + override lazy val planner: SparkPlanner = { new SparkPlanner(ctx) with HiveStrategies { override val hiveContext = ctx diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 2e7de1662261f..3f81c99c41e14 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -69,17 +69,17 @@ case class CreateTableAsSelect( withFormat } - hiveContext.hiveCatalog.client.createTable(withSchema, ignoreIfExists = false) + hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation - hiveContext.hiveCatalog.lookupRelation(tableIdentifier, None) match { + hiveContext.catalog.lookupRelation(tableIdentifier, None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.hiveCatalog.tableExists(tableIdentifier)) { + if (hiveContext.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index b530fb8d1932c..2914d03749321 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -49,14 +49,14 @@ private[hive] case class CreateViewAsSelect( override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.hiveCatalog.tableExists(tableIdentifier) match { + hiveContext.catalog.tableExists(tableIdentifier) match { case true if allowExisting => // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view // already exists. case true if orReplace => // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - hiveContext.hiveCatalog.client.alertView(prepareTable(sqlContext)) + hiveContext.catalog.client.alertView(prepareTable(sqlContext)) case true => // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already @@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect( "CREATE OR REPLACE VIEW AS") case false => - hiveContext.hiveCatalog.client.createView(prepareTable(sqlContext)) + hiveContext.catalog.client.createView(prepareTable(sqlContext)) } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 7f404800bbe5f..145b5f7cc2dc2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -46,7 +46,7 @@ case class InsertIntoHiveTable( @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient private lazy val hiveContext = new Context(sc.hiveconf) - @transient private lazy val catalog = sc.hiveCatalog + @transient private lazy val catalog = sc.catalog def output: Seq[Attribute] = Seq.empty diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 1658dfd771c58..246b52a3b01d8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -71,7 +71,7 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.hiveCatalog.unregisterTable(TableIdentifier(tableName)) + hiveContext.catalog.unregisterTable(TableIdentifier(tableName)) Seq.empty[Row] } } @@ -130,7 +130,7 @@ case class CreateMetastoreDataSource( val tableName = tableIdent.unquotedString val hiveContext = sqlContext.asInstanceOf[HiveContext] - if (hiveContext.hiveCatalog.tableExists(tableIdent)) { + if (hiveContext.catalog.tableExists(tableIdent)) { if (allowExisting) { return Seq.empty[Row] } else { @@ -142,12 +142,12 @@ case class CreateMetastoreDataSource( val optionsWithPath = if (!options.contains("path") && managedIfNoPath) { isExternal = false - options + ("path" -> hiveContext.hiveCatalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } - hiveContext.hiveCatalog.createDataSourceTable( + hiveContext.catalog.createDataSourceTable( tableIdent, userSpecifiedSchema, Array.empty[String], @@ -192,7 +192,7 @@ case class CreateMetastoreDataSourceAsSelect( val optionsWithPath = if (!options.contains("path")) { isExternal = false - options + ("path" -> hiveContext.hiveCatalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } @@ -274,7 +274,7 @@ case class CreateMetastoreDataSourceAsSelect( // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - hiveContext.hiveCatalog.createDataSourceTable( + hiveContext.catalog.createDataSourceTable( tableIdent, Some(resolved.relation.schema), partitionColumns, @@ -285,7 +285,7 @@ case class CreateMetastoreDataSourceAsSelect( } // Refresh the cache of the table in the catalog. - hiveContext.hiveCatalog.refreshTable(tableIdent) + hiveContext.catalog.refreshTable(tableIdent) Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 340a2186d6c2f..00b6cefa7d61b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveNativeCommand -import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{ShutdownHookManager, Utils} // SPARK-3729: Test key required to check for initialization errors with config. @@ -426,9 +426,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { cacheManager.clearCache() loadedTables.clear() - hiveCatalog.cachedDataSourceTables.invalidateAll() - hiveCatalog.client.reset() - hiveCatalog.unregisterAllTables() + catalog.cachedDataSourceTables.invalidateAll() + catalog.client.reset() + catalog.unregisterAllTables() FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 5a440b9fd508d..8c4af1b8eaf44 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -72,7 +72,7 @@ public void setUp() throws IOException { if (path.exists()) { path.delete(); } - hiveManagedPath = new Path(sqlContext.hiveCatalog().hiveDefaultTableFilePath( + hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath( new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index e9710909dfd2c..90d65d9e9b8c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = hiveCatalog.client.getTable("default", "t") + val hiveTable = catalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = hiveCatalog.client.getTable("default", "t") + val hiveTable = catalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite |AS SELECT 1 AS d1, "val_1" AS d2 """.stripMargin) - val hiveTable = hiveCatalog.client.getTable("default", "t") + val hiveTable = catalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 8d50f189e2085..cb23959c2dd57 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -369,7 +369,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv |) """.stripMargin) - val expectedPath = hiveCatalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) + val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) @@ -474,7 +474,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Drop table will also delete the data. sql("DROP TABLE savedJsonTable") intercept[IOException] { - read.json(hiveCatalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) + read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) } } @@ -704,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) // Manually create a metastore data source table. - hiveCatalog.createDataSourceTable( + catalog.createDataSourceTable( tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -736,14 +736,14 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv outputFormat = None, serde = None, serdeProperties = Map( - "path" -> hiveCatalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) + "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema" -> schema.json, "EXTERNAL" -> "FALSE")) - hiveCatalog.client.createTable(hiveTable, ignoreIfExists = false) + catalog.client.createTable(hiveTable, ignoreIfExists = false) invalidateTable(tableName) val actualSchema = table(tableName).schema @@ -758,7 +758,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = hiveCatalog.client.getTable("default", tableName) + val metastoreTable = catalog.client.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt @@ -793,7 +793,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .sortBy("c") .saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = hiveCatalog.client.getTable("default", tableName) + val metastoreTable = catalog.client.getTable("default", tableName) val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) @@ -910,7 +910,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("skip hive metadata on table creation") { val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) - hiveCatalog.createDataSourceTable( + catalog.createDataSourceTable( tableIdent = TableIdentifier("not_skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -921,10 +921,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in Hive compatible format, we verify that // each column of the table is of native type StringType. - assert(hiveCatalog.client.getTable("default", "not_skip_hive_metadata").schema + assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) - hiveCatalog.createDataSourceTable( + catalog.createDataSourceTable( tableIdent = TableIdentifier("skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -935,7 +935,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in SparkSQL format, we verify that // the table has a column type as array of StringType. - assert(hiveCatalog.client.getTable("default", "skip_hive_metadata").schema + assert(catalog.client.getTable("default", "skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index a8ca5ed47fd85..488f298981c86 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -25,9 +25,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private lazy val df = sqlContext.range(10).coalesce(1) private def checkTablePath(dbName: String, tableName: String): Unit = { - val metastoreTable = hiveContext.hiveCatalog.client.getTable(dbName, tableName) - val expectedPath = - hiveContext.hiveCatalog.client.getDatabase(dbName).locationUri + "/" + tableName + val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName) + val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName assert(metastoreTable.storage.serdeProperties("path") === expectedPath) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index ec2452cec0b73..a127cf6e4b7d4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -425,9 +425,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } test("Caching converted data source Parquet Relations") { - def checkCached(tableIdentifier: hiveCatalog.QualifiedTableName): Unit = { + def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached. - hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { + catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") case logical @ LogicalRelation(parquetRelation: ParquetRelation, _, _) => // OK case other => @@ -452,17 +452,17 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - var tableIdentifier = hiveCatalog.QualifiedTableName("default", "test_insert_parquet") + var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet") // First, make sure the converted test_parquet is not cached. - assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Table lookup will make the table cached. table("test_insert_parquet") checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. invalidateTable("test_insert_parquet") - assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_insert_parquet @@ -475,7 +475,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select a, b from jt").collect()) // Invalidate the cache. invalidateTable("test_insert_parquet") - assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Create a partitioned table. sql( @@ -492,9 +492,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifier = - hiveCatalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") - assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -503,14 +502,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. - assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (`date`='2015-04-02') |select a, b from jt """.stripMargin) - assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") @@ -526,7 +525,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin).collect()) invalidateTable("test_parquet_partitioned_cache_test") - assert(hiveCatalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index f575c04dec2ef..c37b21bed3ab0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -69,7 +69,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle def tableDir: File = { val identifier = hiveContext.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(hiveContext.hiveCatalog.hiveDefaultTableFilePath(identifier))) + new File(URI.create(hiveContext.catalog.hiveDefaultTableFilePath(identifier))) } /** From 6c970cbc38ef02bf7e0d1864d7a2a1e764d50e80 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 27 Feb 2016 13:51:30 -0800 Subject: [PATCH 13/16] Make lazy vals defs --- .../org/apache/spark/sql/SQLContext.scala | 20 +++++++++---------- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../apache/spark/sql/hive/parquetSuites.scala | 7 ++++--- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 4a3ff2b4b9d16..4cbe42aa764ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -117,15 +117,15 @@ class SQLContext private[sql]( */ @transient protected[sql] lazy val sessionState: SessionState = new SessionState(self) - protected[sql] lazy val conf: SQLConf = sessionState.conf - protected[sql] lazy val catalog: Catalog = sessionState.catalog - protected[sql] lazy val functionRegistry: FunctionRegistry = sessionState.functionRegistry - protected[sql] lazy val analyzer: Analyzer = sessionState.analyzer - protected[sql] lazy val optimizer: Optimizer = sessionState.optimizer - protected[sql] lazy val sqlParser: ParserInterface = sessionState.sqlParser - protected[sql] lazy val planner: SparkPlanner = sessionState.planner - protected[sql] lazy val continuousQueryManager = sessionState.continuousQueryManager - protected[sql] lazy val prepareForExecution: RuleExecutor[SparkPlan] = + protected[sql] def conf: SQLConf = sessionState.conf + protected[sql] def catalog: Catalog = sessionState.catalog + protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry + protected[sql] def analyzer: Analyzer = sessionState.analyzer + protected[sql] def optimizer: Optimizer = sessionState.optimizer + protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser + protected[sql] def planner: SparkPlanner = sessionState.planner + protected[sql] def continuousQueryManager = sessionState.continuousQueryManager + protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] = sessionState.prepareForExecution /** @@ -284,7 +284,7 @@ class SQLContext private[sql]( * @group basic * @since 1.3.0 */ - lazy val udf: UDFRegistration = sessionState.udf + def udf: UDFRegistration = sessionState.udf /** * Returns true if the table is currently cached in-memory. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 2e6f16254ae80..bab7f5b2074f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -110,7 +110,7 @@ class HiveContext private[hive]( protected[sql] override lazy val sessionState = new HiveSessionState(self) - protected[sql] override lazy val catalog = sessionState.catalog + protected[sql] override def catalog = sessionState.catalog /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index a127cf6e4b7d4..d81c5668226e7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -425,7 +425,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } test("Caching converted data source Parquet Relations") { - def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = { + val _catalog = catalog + def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") @@ -452,7 +453,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet") + var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet") // First, make sure the converted test_parquet is not cached. assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) @@ -492,7 +493,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") + tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ From d37feb40f3964aaadbf5fb90c166b5df63dd899d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 27 Feb 2016 13:53:37 -0800 Subject: [PATCH 14/16] Make sure sessionState is transient in subclasses --- .../test/scala/org/apache/spark/sql/test/TestSQLContext.scala | 1 + .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 1 + .../src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 1 + 3 files changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 1823844f28ed5..b3e146fba80be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -31,6 +31,7 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel new SparkConf().set("spark.sql.testkey", "true"))) } + @transient protected[sql] override lazy val sessionState: SessionState = new SessionState(self) { override lazy val conf: SQLConf = { new SQLConf { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index bab7f5b2074f9..31d0010677d3e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -108,6 +108,7 @@ class HiveContext private[hive]( isRootContext = false) } + @transient protected[sql] override lazy val sessionState = new HiveSessionState(self) protected[sql] override def catalog = sessionState.catalog diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 00b6cefa7d61b..a7eca46d1980d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -120,6 +120,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { override def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) + @transient protected[sql] override lazy val sessionState = new HiveSessionState(this) { override lazy val conf: SQLConf = { new SQLConf { From c76a095de92392f53a1bedd8a4bf57a1ca86b6b7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 27 Feb 2016 13:59:14 -0800 Subject: [PATCH 15/16] Fix tests Hive current_database needs to be registered in the constructor of the HiveContext. Before this commit, that was not the case because HiveSessionState's functionRegistry is lazy. The original behavior before this patch is now restored as of this commit. --- .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 7 ++++++- .../scala/org/apache/spark/sql/hive/HiveSessionState.scala | 6 +----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 31d0010677d3e..a9295d31c07bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -42,7 +42,7 @@ import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.expressions.LeafExpression +import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution._ @@ -113,6 +113,11 @@ class HiveContext private[hive]( protected[sql] override def catalog = sessionState.catalog + // The Hive UDF current_database() is foldable, will be evaluated by optimizer, + // but the optimizer can't access the SessionState of metadataHive. + sessionState.functionRegistry.registerFunction( + "current_database", (e: Seq[Expression]) => new CurrentDatabase(self)) + /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 886fcf23cbfc0..09f54be04d0c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -45,11 +45,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) * Note that HiveUDFs will be overridden by functions registered in this context. */ override lazy val functionRegistry: FunctionRegistry = { - val registry = new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive) - // The Hive UDF current_database() is foldable, will be evaluated by optimizer, - // but the optimizer can't access the SessionState of metadataHive. - registry.registerFunction("current_database", (e: Seq[Expression]) => new CurrentDatabase(ctx)) - registry + new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive) } /** From bdf1a5d5a0cc60b6fc8cf5a6c01d75ad6c5b12aa Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 27 Feb 2016 17:06:26 -0800 Subject: [PATCH 16/16] Fix tests better --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 4cbe42aa764ab..cb4a6397b261b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -133,7 +133,7 @@ class SQLContext private[sql]( * that listen for execution metrics. */ @Experimental - lazy val listenerManager: ExecutionListenerManager = sessionState.listenerManager + def listenerManager: ExecutionListenerManager = sessionState.listenerManager /** * Set Spark SQL configuration properties.