From c5fd485b17c4c1d3fc532cacd79570a4f9e4fc20 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 15 Mar 2016 00:18:45 -0700 Subject: [PATCH 1/2] [SPARK-13893][SQL] Remove SQLContext.catalog (internal method) --- .../apache/spark/sql/DataFrameReader.scala | 2 +- .../apache/spark/sql/DataFrameWriter.scala | 2 +- .../org/apache/spark/sql/SQLContext.scala | 12 +++++----- .../sql/execution/command/commands.scala | 4 ++-- .../spark/sql/execution/datasources/ddl.scala | 8 +++---- .../apache/spark/sql/ListTablesSuite.scala | 6 ++--- .../parquet/ParquetQuerySuite.scala | 4 ++-- .../apache/spark/sql/hive/HiveContext.scala | 10 ++++---- .../hive/execution/CreateTableAsSelect.scala | 6 ++--- .../hive/execution/CreateViewAsSelect.scala | 6 ++--- .../hive/execution/InsertIntoHiveTable.scala | 2 +- .../spark/sql/hive/execution/commands.scala | 19 ++++++++------- .../apache/spark/sql/hive/test/TestHive.scala | 6 ++--- .../hive/JavaMetastoreDataSourcesSuite.java | 2 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 6 ++--- .../spark/sql/hive/ListTablesSuite.scala | 4 ++-- .../sql/hive/MetastoreDataSourcesSuite.scala | 24 ++++++++++--------- .../spark/sql/hive/MultiDatabaseSuite.scala | 5 ++-- .../spark/sql/hive/StatisticsSuite.scala | 5 ++-- .../sql/hive/execution/SQLQuerySuite.scala | 5 ++-- .../spark/sql/hive/orc/OrcQuerySuite.scala | 4 ++-- .../apache/spark/sql/hive/parquetSuites.scala | 18 +++++++------- .../sql/sources/BucketedWriteSuite.scala | 2 +- 23 files changed, 83 insertions(+), 79 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 76b8d71ac9359..57c978bec8485 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -394,7 +394,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { */ def table(tableName: String): DataFrame = { Dataset.newDataFrame(sqlContext, - sqlContext.catalog.lookupRelation( + sqlContext.sessionState.catalog.lookupRelation( sqlContext.sessionState.sqlParser.parseTableIdentifier(tableName))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index de87f4d7c24ef..5e96b716ba73d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -358,7 +358,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { } private def saveAsTable(tableIdent: TableIdentifier): Unit = { - val tableExists = df.sqlContext.catalog.tableExists(tableIdent) + val tableExists = df.sqlContext.sessionState.catalog.tableExists(tableIdent) (tableExists, mode) match { case (true, SaveMode.Ignore) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0f5d1c8cab519..a8a75eaa431ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -120,7 +120,6 @@ class SQLContext private[sql]( @transient protected[sql] lazy val sessionState: SessionState = new SessionState(self) protected[sql] def conf: SQLConf = sessionState.conf - protected[sql] def catalog: Catalog = sessionState.catalog protected[sql] def analyzer: Analyzer = sessionState.analyzer /** @@ -699,7 +698,8 @@ class SQLContext private[sql]( * only during the lifetime of this instance of SQLContext. */ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { - catalog.registerTable(sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan) + sessionState.catalog.registerTable( + sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan) } /** @@ -712,7 +712,7 @@ class SQLContext private[sql]( */ def dropTempTable(tableName: String): Unit = { cacheManager.tryUncacheQuery(table(tableName)) - catalog.unregisterTable(TableIdentifier(tableName)) + sessionState.catalog.unregisterTable(TableIdentifier(tableName)) } /** @@ -797,7 +797,7 @@ class SQLContext private[sql]( } private def table(tableIdent: TableIdentifier): DataFrame = { - Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent)) + Dataset.newDataFrame(this, sessionState.catalog.lookupRelation(tableIdent)) } /** @@ -839,7 +839,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(): Array[String] = { - catalog.getTables(None).map { + sessionState.catalog.getTables(None).map { case (tableName, _) => tableName }.toArray } @@ -851,7 +851,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(databaseName: String): Array[String] = { - catalog.getTables(Some(databaseName)).map { + sessionState.catalog.getTables(Some(databaseName)).map { case (tableName, _) => tableName }.toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index e711797c1b51a..44b07e4613263 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -330,7 +330,7 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma override def run(sqlContext: SQLContext): Seq[Row] = { // Since we need to return a Seq of rows, we will call getTables directly // instead of calling tables in sqlContext. - val rows = sqlContext.catalog.getTables(databaseName).map { + val rows = sqlContext.sessionState.catalog.getTables(databaseName).map { case (tableName, isTemporary) => Row(tableName, isTemporary) } @@ -417,7 +417,7 @@ case class DescribeFunction( case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.catalog.setCurrentDatabase(databaseName) + sqlContext.sessionState.catalog.setCurrentDatabase(databaseName) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 04e51735c46f7..7ca0e8859a03e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -99,7 +99,7 @@ case class CreateTempTableUsing( userSpecifiedSchema = userSpecifiedSchema, className = provider, options = options) - sqlContext.catalog.registerTable( + sqlContext.sessionState.catalog.registerTable( tableIdent, Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan) @@ -124,7 +124,7 @@ case class CreateTempTableUsingAsSelect( bucketSpec = None, options = options) val result = dataSource.write(mode, df) - sqlContext.catalog.registerTable( + sqlContext.sessionState.catalog.registerTable( tableIdent, Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan) @@ -137,11 +137,11 @@ case class RefreshTable(tableIdent: TableIdentifier) override def run(sqlContext: SQLContext): Seq[Row] = { // Refresh the given table's metadata first. - sqlContext.catalog.refreshTable(tableIdent) + sqlContext.sessionState.catalog.refreshTable(tableIdent) // If this table is cached as a InMemoryColumnarRelation, drop the original // cached version and make the new version cached lazily. - val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent) + val logicalPlan = sqlContext.sessionState.catalog.lookupRelation(tableIdent) // Use lookupCachedData directly since RefreshTable also takes databaseName. val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty if (isCached) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala index 3d7c576965fc0..2820e4fa23e13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala @@ -33,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex } after { - sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) } test("get all tables") { @@ -45,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } @@ -58,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index acfc1a518a0a5..fb99b0c7e2acd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -51,7 +51,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple)) } - sqlContext.catalog.unregisterTable(TableIdentifier("tmp")) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp")) } test("overwriting") { @@ -61,7 +61,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple)) } - sqlContext.catalog.unregisterTable(TableIdentifier("tmp")) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp")) } test("self-join") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index a78b7b0cc4961..05fc569588658 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -113,8 +113,6 @@ class HiveContext private[hive]( @transient protected[sql] override lazy val sessionState = new HiveSessionState(self) - protected[sql] override def catalog = sessionState.catalog - // The Hive UDF current_database() is foldable, will be evaluated by optimizer, // but the optimizer can't access the SessionState of metadataHive. sessionState.functionRegistry.registerFunction( @@ -349,12 +347,12 @@ class HiveContext private[hive]( */ def refreshTable(tableName: String): Unit = { val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - catalog.refreshTable(tableIdent) + sessionState.catalog.refreshTable(tableIdent) } protected[hive] def invalidateTable(tableName: String): Unit = { val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - catalog.invalidateTable(tableIdent) + sessionState.catalog.invalidateTable(tableIdent) } /** @@ -368,7 +366,7 @@ class HiveContext private[hive]( */ def analyze(tableName: String) { val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(catalog.lookupRelation(tableIdent)) + val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) relation match { case relation: MetastoreRelation => @@ -429,7 +427,7 @@ class HiveContext private[hive]( // recorded in the Hive metastore. // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - catalog.client.alterTable( + sessionState.catalog.client.alterTable( relation.table.copy( properties = relation.table.properties + (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 44f579fbb78d1..91425d143554a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -69,17 +69,17 @@ case class CreateTableAsSelect( withFormat } - hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false) + hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation - hiveContext.catalog.lookupRelation(tableIdentifier, None) match { + hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.catalog.tableExists(tableIdentifier)) { + if (hiveContext.sessionState.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 83d057f7e4d88..6c2b88eb8c6d9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -49,14 +49,14 @@ private[hive] case class CreateViewAsSelect( override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.catalog.tableExists(tableIdentifier) match { + hiveContext.sessionState.catalog.tableExists(tableIdentifier) match { case true if allowExisting => // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view // already exists. case true if orReplace => // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - hiveContext.catalog.client.alertView(prepareTable(sqlContext)) + hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext)) case true => // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already @@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect( "CREATE OR REPLACE VIEW AS") case false => - hiveContext.catalog.client.createView(prepareTable(sqlContext)) + hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext)) } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b3d38dfdb49ea..4ffd868242b86 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -45,7 +45,7 @@ case class InsertIntoHiveTable( @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient private lazy val hiveContext = new Context(sc.hiveconf) - @transient private lazy val catalog = sc.catalog + @transient private lazy val catalog = sc.sessionState.catalog def output: Seq[Attribute] = Seq.empty diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index c4723fcb82341..ff6657362013d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -71,7 +71,7 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.catalog.unregisterTable(TableIdentifier(tableName)) + hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName)) Seq.empty[Row] } } @@ -130,7 +130,7 @@ case class CreateMetastoreDataSource( val tableName = tableIdent.unquotedString val hiveContext = sqlContext.asInstanceOf[HiveContext] - if (hiveContext.catalog.tableExists(tableIdent)) { + if (hiveContext.sessionState.catalog.tableExists(tableIdent)) { if (allowExisting) { return Seq.empty[Row] } else { @@ -142,7 +142,7 @@ case class CreateMetastoreDataSource( val optionsWithPath = if (!options.contains("path") && managedIfNoPath) { isExternal = false - options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } @@ -155,7 +155,7 @@ case class CreateMetastoreDataSource( bucketSpec = None, options = optionsWithPath).resolveRelation() - hiveContext.catalog.createDataSourceTable( + hiveContext.sessionState.catalog.createDataSourceTable( tableIdent, userSpecifiedSchema, Array.empty[String], @@ -200,13 +200,13 @@ case class CreateMetastoreDataSourceAsSelect( val optionsWithPath = if (!options.contains("path")) { isExternal = false - options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } var existingSchema = None: Option[StructType] - if (sqlContext.catalog.tableExists(tableIdent)) { + if (sqlContext.sessionState.catalog.tableExists(tableIdent)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => @@ -230,7 +230,8 @@ case class CreateMetastoreDataSourceAsSelect( // TODO: Check that options from the resolved relation match the relation that we are // inserting into (i.e. using the same compression). - EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match { + EliminateSubqueryAliases( + sqlContext.sessionState.catalog.lookupRelation(tableIdent)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => existingSchema = Some(l.schema) case o => @@ -267,7 +268,7 @@ case class CreateMetastoreDataSourceAsSelect( // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - hiveContext.catalog.createDataSourceTable( + hiveContext.sessionState.catalog.createDataSourceTable( tableIdent, Some(result.schema), partitionColumns, @@ -278,7 +279,7 @@ case class CreateMetastoreDataSourceAsSelect( } // Refresh the cache of the table in the catalog. - hiveContext.catalog.refreshTable(tableIdent) + hiveContext.sessionState.catalog.refreshTable(tableIdent) Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 5887f69e13836..73589add8d2b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -427,9 +427,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { cacheManager.clearCache() loadedTables.clear() - catalog.cachedDataSourceTables.invalidateAll() - catalog.client.reset() - catalog.unregisterAllTables() + sessionState.catalog.cachedDataSourceTables.invalidateAll() + sessionState.catalog.client.reset() + sessionState.catalog.unregisterAllTables() FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index e9356541c22df..bd14a243eaeb4 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -70,7 +70,7 @@ public void setUp() throws IOException { if (path.exists()) { path.delete(); } - hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath( + hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath( new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 90d65d9e9b8c0..ce7b08ab72f79 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = catalog.client.getTable("default", "t") + val hiveTable = sessionState.catalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = catalog.client.getTable("default", "t") + val hiveTable = sessionState.catalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite |AS SELECT 1 AS d1, "val_1" AS d2 """.stripMargin) - val hiveTable = catalog.client.getTable("default", "t") + val hiveTable = sessionState.catalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index a94f7053c39ff..0a31ac64a20f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -32,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft override def beforeAll(): Unit = { // The catalog in HiveContext is a case insensitive one. - catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan) + sessionState.catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan) sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)") sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB") sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)") } override def afterAll(): Unit = { - catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable") sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable") sql("DROP DATABASE IF EXISTS ListTablesSuiteDB") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index aaebad79f6b66..d7974f1ee3ec0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -369,7 +369,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv |) """.stripMargin) - val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) + val expectedPath = + sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) @@ -460,7 +461,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Drop table will also delete the data. sql("DROP TABLE savedJsonTable") intercept[AnalysisException] { - read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) + read.json( + sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) } } @@ -695,7 +697,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) // Manually create a metastore data source table. - catalog.createDataSourceTable( + sessionState.catalog.createDataSourceTable( tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -727,14 +729,14 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv outputFormat = None, serde = None, serdeProperties = Map( - "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) + "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema" -> schema.json, "EXTERNAL" -> "FALSE")) - catalog.client.createTable(hiveTable, ignoreIfExists = false) + sessionState.catalog.client.createTable(hiveTable, ignoreIfExists = false) invalidateTable(tableName) val actualSchema = table(tableName).schema @@ -749,7 +751,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = catalog.client.getTable("default", tableName) + val metastoreTable = sessionState.catalog.client.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt @@ -784,7 +786,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .sortBy("c") .saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = catalog.client.getTable("default", tableName) + val metastoreTable = sessionState.catalog.client.getTable("default", tableName) val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) @@ -901,7 +903,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("skip hive metadata on table creation") { val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) - catalog.createDataSourceTable( + sessionState.catalog.createDataSourceTable( tableIdent = TableIdentifier("not_skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -912,10 +914,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in Hive compatible format, we verify that // each column of the table is of native type StringType. - assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema + assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) - catalog.createDataSourceTable( + sessionState.catalog.createDataSourceTable( tableIdent = TableIdentifier("skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -926,7 +928,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in SparkSQL format, we verify that // the table has a column type as array of StringType. - assert(catalog.client.getTable("default", "skip_hive_metadata").schema + assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 488f298981c86..e2effef0b9bdf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -25,8 +25,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private lazy val df = sqlContext.range(10).coalesce(1) private def checkTablePath(dbName: String, tableName: String): Unit = { - val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName) - val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName + val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName) + val expectedPath = + hiveContext.sessionState.catalog.client.getDatabase(dbName).locationUri + "/" + tableName assert(metastoreTable.storage.serdeProperties("path") === expectedPath) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 3952e716d38a6..1d8c293d43c05 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -73,7 +73,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes + hiveContext.sessionState.catalog.lookupRelation( + TableIdentifier(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -120,7 +121,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { intercept[UnsupportedOperationException] { hiveContext.analyze("tempTable") } - hiveContext.catalog.unregisterTable(TableIdentifier("tempTable")) + hiveContext.sessionState.catalog.unregisterTable(TableIdentifier("tempTable")) } test("estimates the size of a test MetastoreRelation") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index b42f00e90f31d..21dfb82876fc2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -292,7 +292,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("CTAS without serde") { def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { - val relation = EliminateSubqueryAliases(catalog.lookupRelation(TableIdentifier(tableName))) + val relation = EliminateSubqueryAliases( + sessionState.catalog.lookupRelation(TableIdentifier(tableName))) relation match { case LogicalRelation(r: HadoopFsRelation, _, _) => if (!isDataSourceParquet) { @@ -720,7 +721,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { (1 to 100).par.map { i => val tableName = s"SPARK_6618_table_$i" sql(s"CREATE TABLE $tableName (col1 string)") - catalog.lookupRelation(TableIdentifier(tableName)) + sessionState.catalog.lookupRelation(TableIdentifier(tableName)) table(tableName) tables() sql(s"DROP TABLE $tableName") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 8cfb32f00a884..57c4ad4248b72 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } - catalog.unregisterTable(TableIdentifier("tmp")) + sessionState.catalog.unregisterTable(TableIdentifier("tmp")) } test("overwriting") { @@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(table("t"), data.map(Row.fromTuple)) } - catalog.unregisterTable(TableIdentifier("tmp")) + sessionState.catalog.unregisterTable(TableIdentifier("tmp")) } test("self-join") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 8fdbbd94c807e..bb53179c3cce3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -425,10 +425,10 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } test("Caching converted data source Parquet Relations") { - val _catalog = catalog + val _catalog = sessionState.catalog def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached. - catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { + sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK case other => @@ -456,14 +456,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet") // First, make sure the converted test_parquet is not cached. - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Table lookup will make the table cached. table("test_insert_parquet") checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. invalidateTable("test_insert_parquet") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_insert_parquet @@ -476,7 +476,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select a, b from jt").collect()) // Invalidate the cache. invalidateTable("test_insert_parquet") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Create a partitioned table. sql( @@ -494,7 +494,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -503,14 +503,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (`date`='2015-04-02') |select a, b from jt """.stripMargin) - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") @@ -526,7 +526,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin).collect()) invalidateTable("test_parquet_partitioned_cache_test") - assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 33c1bb059e2fe..a3e7737a7c059 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -70,7 +70,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle def tableDir: File = { val identifier = hiveContext.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(hiveContext.catalog.hiveDefaultTableFilePath(identifier))) + new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier))) } /** From 49197aa1e554cc13df737cf4edb82e10e6d73ea4 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 15 Mar 2016 00:34:34 -0700 Subject: [PATCH 2/2] also remove analyzer --- .../spark/sql/DataFrameNaFunctions.scala | 8 +++---- .../apache/spark/sql/DataFrameWriter.scala | 2 +- .../scala/org/apache/spark/sql/Dataset.scala | 22 ++++++++++--------- .../org/apache/spark/sql/SQLContext.scala | 1 - .../spark/sql/execution/QueryExecution.scala | 4 ++-- .../datasources/FileSourceStrategy.scala | 3 ++- .../apache/spark/sql/hive/test/TestHive.scala | 2 +- 7 files changed, 22 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index f7be5f6b370ab..33588ef72ffbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -155,7 +155,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * @since 1.3.1 */ def fill(value: Double, cols: Seq[String]): DataFrame = { - val columnEquals = df.sqlContext.analyzer.resolver + val columnEquals = df.sqlContext.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => // Only fill if the column is part of the cols list. if (f.dataType.isInstanceOf[NumericType] && cols.exists(col => columnEquals(f.name, col))) { @@ -182,7 +182,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * @since 1.3.1 */ def fill(value: String, cols: Seq[String]): DataFrame = { - val columnEquals = df.sqlContext.analyzer.resolver + val columnEquals = df.sqlContext.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => // Only fill if the column is part of the cols list. if (f.dataType.isInstanceOf[StringType] && cols.exists(col => columnEquals(f.name, col))) { @@ -353,7 +353,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { case _: String => StringType } - val columnEquals = df.sqlContext.analyzer.resolver + val columnEquals = df.sqlContext.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => val shouldReplace = cols.exists(colName => columnEquals(colName, f.name)) if (f.dataType.isInstanceOf[NumericType] && targetColumnType == DoubleType && shouldReplace) { @@ -382,7 +382,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { } } - val columnEquals = df.sqlContext.analyzer.resolver + val columnEquals = df.sqlContext.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => values.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) => v match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5e96b716ba73d..9951f0fabff15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -323,7 +323,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { */ private def normalize(columnName: String, columnType: String): String = { val validColumnNames = df.logicalPlan.output.map(_.name) - validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName)) + validColumnNames.find(df.sqlContext.sessionState.analyzer.resolver(_, columnName)) .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + s"existing columns (${validColumnNames.mkString(", ")})")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ef239a1e2f324..f7ef0de21c6c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -166,15 +166,16 @@ class Dataset[T] private[sql]( private implicit def classTag = unresolvedTEncoder.clsTag protected[sql] def resolve(colName: String): NamedExpression = { - queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse { - throw new AnalysisException( - s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""") - } + queryExecution.analyzed.resolveQuoted(colName, sqlContext.sessionState.analyzer.resolver) + .getOrElse { + throw new AnalysisException( + s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""") + } } protected[sql] def numericColumns: Seq[Expression] = { schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n => - queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get + queryExecution.analyzed.resolveQuoted(n.name, sqlContext.sessionState.analyzer.resolver).get } } @@ -1400,7 +1401,7 @@ class Dataset[T] private[sql]( * @since 1.3.0 */ def withColumn(colName: String, col: Column): DataFrame = { - val resolver = sqlContext.analyzer.resolver + val resolver = sqlContext.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) if (shouldReplace) { @@ -1421,7 +1422,7 @@ class Dataset[T] private[sql]( * Returns a new [[DataFrame]] by adding a column with metadata. */ private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = { - val resolver = sqlContext.analyzer.resolver + val resolver = sqlContext.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) if (shouldReplace) { @@ -1445,7 +1446,7 @@ class Dataset[T] private[sql]( * @since 1.3.0 */ def withColumnRenamed(existingName: String, newName: String): DataFrame = { - val resolver = sqlContext.analyzer.resolver + val resolver = sqlContext.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldRename = output.exists(f => resolver(f.name, existingName)) if (shouldRename) { @@ -1480,7 +1481,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def drop(colNames: String*): DataFrame = { - val resolver = sqlContext.analyzer.resolver + val resolver = sqlContext.sessionState.analyzer.resolver val remainingCols = schema.filter(f => colNames.forall(n => !resolver(f.name, n))).map(f => Column(f.name)) if (remainingCols.size == this.schema.size) { @@ -1501,7 +1502,8 @@ class Dataset[T] private[sql]( def drop(col: Column): DataFrame = { val expression = col match { case Column(u: UnresolvedAttribute) => - queryExecution.analyzed.resolveQuoted(u.name, sqlContext.analyzer.resolver).getOrElse(u) + queryExecution.analyzed.resolveQuoted( + u.name, sqlContext.sessionState.analyzer.resolver).getOrElse(u) case Column(expr: Expression) => expr } val attrs = this.logicalPlan.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a8a75eaa431ac..177d78c4c08f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -120,7 +120,6 @@ class SQLContext private[sql]( @transient protected[sql] lazy val sessionState: SessionState = new SessionState(self) protected[sql] def conf: SQLConf = sessionState.conf - protected[sql] def analyzer: Analyzer = sessionState.analyzer /** * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 5b4254f741ab1..912b84abc187c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -31,14 +31,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} */ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { - def assertAnalyzed(): Unit = try sqlContext.analyzer.checkAnalysis(analyzed) catch { + def assertAnalyzed(): Unit = try sqlContext.sessionState.analyzer.checkAnalysis(analyzed) catch { case e: AnalysisException => val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed)) ae.setStackTrace(e.getStackTrace) throw ae } - lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical) + lazy val analyzed: LogicalPlan = sqlContext.sessionState.analyzer.execute(logical) lazy val withCachedData: LogicalPlan = { assertAnalyzed() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index ef95d5d28961f..84e98c0f9e0f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -67,7 +67,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val filterSet = ExpressionSet(filters) val partitionColumns = - AttributeSet(l.resolve(files.partitionSchema, files.sqlContext.analyzer.resolver)) + AttributeSet( + l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)) val partitionKeyFilters = ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 73589add8d2b9..19c05f9cb0d9c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -205,7 +205,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(loadTestTable) // Proceed with analysis. - analyzer.execute(logical) + sessionState.analyzer.execute(logical) } }