Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13893][SQL] Remove SQLContext.catalog/analyzer (internal method) #11716

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
*/
def table(tableName: String): DataFrame = {
Dataset.newDataFrame(sqlContext,
sqlContext.catalog.lookupRelation(
sqlContext.sessionState.catalog.lookupRelation(
sqlContext.sessionState.sqlParser.parseTableIdentifier(tableName)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}

private def saveAsTable(tableIdent: TableIdentifier): Unit = {
val tableExists = df.sqlContext.catalog.tableExists(tableIdent)
val tableExists = df.sqlContext.sessionState.catalog.tableExists(tableIdent)

(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
Expand Down
12 changes: 6 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ class SQLContext private[sql](
@transient
protected[sql] lazy val sessionState: SessionState = new SessionState(self)
protected[sql] def conf: SQLConf = sessionState.conf
protected[sql] def catalog: Catalog = sessionState.catalog
protected[sql] def analyzer: Analyzer = sessionState.analyzer

/**
Expand Down Expand Up @@ -699,7 +698,8 @@ class SQLContext private[sql](
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
catalog.registerTable(sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
sessionState.catalog.registerTable(
sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
}

/**
Expand All @@ -712,7 +712,7 @@ class SQLContext private[sql](
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
catalog.unregisterTable(TableIdentifier(tableName))
sessionState.catalog.unregisterTable(TableIdentifier(tableName))
}

/**
Expand Down Expand Up @@ -797,7 +797,7 @@ class SQLContext private[sql](
}

private def table(tableIdent: TableIdentifier): DataFrame = {
Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent))
Dataset.newDataFrame(this, sessionState.catalog.lookupRelation(tableIdent))
}

/**
Expand Down Expand Up @@ -839,7 +839,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(): Array[String] = {
catalog.getTables(None).map {
sessionState.catalog.getTables(None).map {
case (tableName, _) => tableName
}.toArray
}
Expand All @@ -851,7 +851,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(databaseName: String): Array[String] = {
catalog.getTables(Some(databaseName)).map {
sessionState.catalog.getTables(Some(databaseName)).map {
case (tableName, _) => tableName
}.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
override def run(sqlContext: SQLContext): Seq[Row] = {
// Since we need to return a Seq of rows, we will call getTables directly
// instead of calling tables in sqlContext.
val rows = sqlContext.catalog.getTables(databaseName).map {
val rows = sqlContext.sessionState.catalog.getTables(databaseName).map {
case (tableName, isTemporary) => Row(tableName, isTemporary)
}

Expand Down Expand Up @@ -417,7 +417,7 @@ case class DescribeFunction(
case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
sqlContext.catalog.setCurrentDatabase(databaseName)
sqlContext.sessionState.catalog.setCurrentDatabase(databaseName)
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ case class CreateTempTableUsing(
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
options = options)
sqlContext.catalog.registerTable(
sqlContext.sessionState.catalog.registerTable(
tableIdent,
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)

Expand All @@ -124,7 +124,7 @@ case class CreateTempTableUsingAsSelect(
bucketSpec = None,
options = options)
val result = dataSource.write(mode, df)
sqlContext.catalog.registerTable(
sqlContext.sessionState.catalog.registerTable(
tableIdent,
Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan)

Expand All @@ -137,11 +137,11 @@ case class RefreshTable(tableIdent: TableIdentifier)

override def run(sqlContext: SQLContext): Seq[Row] = {
// Refresh the given table's metadata first.
sqlContext.catalog.refreshTable(tableIdent)
sqlContext.sessionState.catalog.refreshTable(tableIdent)

// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent)
val logicalPlan = sqlContext.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
}

after {
sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
}

test("get all tables") {
Expand All @@ -45,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))

sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}

Expand All @@ -58,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))

sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
}
sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
}

test("overwriting") {
Expand All @@ -61,7 +61,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
}
sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
}

test("self-join") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ class HiveContext private[hive](
@transient
protected[sql] override lazy val sessionState = new HiveSessionState(self)

protected[sql] override def catalog = sessionState.catalog

// The Hive UDF current_database() is foldable, will be evaluated by optimizer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is the last one :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are more actually ... in sqlcontext

// but the optimizer can't access the SessionState of metadataHive.
sessionState.functionRegistry.registerFunction(
Expand Down Expand Up @@ -349,12 +347,12 @@ class HiveContext private[hive](
*/
def refreshTable(tableName: String): Unit = {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
catalog.refreshTable(tableIdent)
sessionState.catalog.refreshTable(tableIdent)
}

protected[hive] def invalidateTable(tableName: String): Unit = {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
catalog.invalidateTable(tableIdent)
sessionState.catalog.invalidateTable(tableIdent)
}

/**
Expand All @@ -368,7 +366,7 @@ class HiveContext private[hive](
*/
def analyze(tableName: String) {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
val relation = EliminateSubqueryAliases(catalog.lookupRelation(tableIdent))
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))

relation match {
case relation: MetastoreRelation =>
Expand Down Expand Up @@ -429,7 +427,7 @@ class HiveContext private[hive](
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
catalog.client.alterTable(
sessionState.catalog.client.alterTable(
relation.table.copy(
properties = relation.table.properties +
(StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ case class CreateTableAsSelect(
withFormat
}

hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false)
hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false)

// Get the Metastore Relation
hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match {
case r: MetastoreRelation => r
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (hiveContext.catalog.tableExists(tableIdentifier)) {
if (hiveContext.sessionState.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ private[hive] case class CreateViewAsSelect(
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]

hiveContext.catalog.tableExists(tableIdentifier) match {
hiveContext.sessionState.catalog.tableExists(tableIdentifier) match {
case true if allowExisting =>
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.

case true if orReplace =>
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
hiveContext.catalog.client.alertView(prepareTable(sqlContext))
hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext))

case true =>
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
Expand All @@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect(
"CREATE OR REPLACE VIEW AS")

case false =>
hiveContext.catalog.client.createView(prepareTable(sqlContext))
hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext))
}

Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class InsertIntoHiveTable(

@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val catalog = sc.catalog
@transient private lazy val catalog = sc.sessionState.catalog

def output: Seq[Attribute] = Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class DropTable(
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(TableIdentifier(tableName))
hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName))
Seq.empty[Row]
}
}
Expand Down Expand Up @@ -130,7 +130,7 @@ case class CreateMetastoreDataSource(
val tableName = tableIdent.unquotedString
val hiveContext = sqlContext.asInstanceOf[HiveContext]

if (hiveContext.catalog.tableExists(tableIdent)) {
if (hiveContext.sessionState.catalog.tableExists(tableIdent)) {
if (allowExisting) {
return Seq.empty[Row]
} else {
Expand All @@ -142,7 +142,7 @@ case class CreateMetastoreDataSource(
val optionsWithPath =
if (!options.contains("path") && managedIfNoPath) {
isExternal = false
options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent))
options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}
Expand All @@ -155,7 +155,7 @@ case class CreateMetastoreDataSource(
bucketSpec = None,
options = optionsWithPath).resolveRelation()

hiveContext.catalog.createDataSourceTable(
hiveContext.sessionState.catalog.createDataSourceTable(
tableIdent,
userSpecifiedSchema,
Array.empty[String],
Expand Down Expand Up @@ -200,13 +200,13 @@ case class CreateMetastoreDataSourceAsSelect(
val optionsWithPath =
if (!options.contains("path")) {
isExternal = false
options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent))
options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}

var existingSchema = None: Option[StructType]
if (sqlContext.catalog.tableExists(tableIdent)) {
if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
Expand All @@ -230,7 +230,8 @@ case class CreateMetastoreDataSourceAsSelect(
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).

EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match {
EliminateSubqueryAliases(
sqlContext.sessionState.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
existingSchema = Some(l.schema)
case o =>
Expand Down Expand Up @@ -267,7 +268,7 @@ case class CreateMetastoreDataSourceAsSelect(
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
hiveContext.catalog.createDataSourceTable(
hiveContext.sessionState.catalog.createDataSourceTable(
tableIdent,
Some(result.schema),
partitionColumns,
Expand All @@ -278,7 +279,7 @@ case class CreateMetastoreDataSourceAsSelect(
}

// Refresh the cache of the table in the catalog.
hiveContext.catalog.refreshTable(tableIdent)
hiveContext.sessionState.catalog.refreshTable(tableIdent)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {

cacheManager.clearCache()
loadedTables.clear()
catalog.cachedDataSourceTables.invalidateAll()
catalog.client.reset()
catalog.unregisterAllTables()
sessionState.catalog.cachedDataSourceTables.invalidateAll()
sessionState.catalog.client.reset()
sessionState.catalog.unregisterAllTables()

FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void setUp() throws IOException {
if (path.exists()) {
path.delete();
}
hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath(
hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
if (fs.exists(hiveManagedPath)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}

val hiveTable = catalog.client.getTable("default", "t")
val hiveTable = sessionState.catalog.client.getTable("default", "t")
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
Expand Down Expand Up @@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}

val hiveTable = catalog.client.getTable("default", "t")
val hiveTable = sessionState.catalog.client.getTable("default", "t")
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
Expand Down Expand Up @@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
|AS SELECT 1 AS d1, "val_1" AS d2
""".stripMargin)

val hiveTable = catalog.client.getTable("default", "t")
val hiveTable = sessionState.catalog.client.getTable("default", "t")
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
Expand Down
Loading