Skip to content

Commit

Permalink
[SPARK-13893][SQL] Remove SQLContext.catalog (internal method)
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Mar 15, 2016
1 parent 276c2d5 commit c5fd485
Show file tree
Hide file tree
Showing 23 changed files with 83 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
*/
def table(tableName: String): DataFrame = {
Dataset.newDataFrame(sqlContext,
sqlContext.catalog.lookupRelation(
sqlContext.sessionState.catalog.lookupRelation(
sqlContext.sessionState.sqlParser.parseTableIdentifier(tableName)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}

private def saveAsTable(tableIdent: TableIdentifier): Unit = {
val tableExists = df.sqlContext.catalog.tableExists(tableIdent)
val tableExists = df.sqlContext.sessionState.catalog.tableExists(tableIdent)

(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
Expand Down
12 changes: 6 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ class SQLContext private[sql](
@transient
protected[sql] lazy val sessionState: SessionState = new SessionState(self)
protected[sql] def conf: SQLConf = sessionState.conf
protected[sql] def catalog: Catalog = sessionState.catalog
protected[sql] def analyzer: Analyzer = sessionState.analyzer

/**
Expand Down Expand Up @@ -699,7 +698,8 @@ class SQLContext private[sql](
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
catalog.registerTable(sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
sessionState.catalog.registerTable(
sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
}

/**
Expand All @@ -712,7 +712,7 @@ class SQLContext private[sql](
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
catalog.unregisterTable(TableIdentifier(tableName))
sessionState.catalog.unregisterTable(TableIdentifier(tableName))
}

/**
Expand Down Expand Up @@ -797,7 +797,7 @@ class SQLContext private[sql](
}

private def table(tableIdent: TableIdentifier): DataFrame = {
Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent))
Dataset.newDataFrame(this, sessionState.catalog.lookupRelation(tableIdent))
}

/**
Expand Down Expand Up @@ -839,7 +839,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(): Array[String] = {
catalog.getTables(None).map {
sessionState.catalog.getTables(None).map {
case (tableName, _) => tableName
}.toArray
}
Expand All @@ -851,7 +851,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(databaseName: String): Array[String] = {
catalog.getTables(Some(databaseName)).map {
sessionState.catalog.getTables(Some(databaseName)).map {
case (tableName, _) => tableName
}.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
override def run(sqlContext: SQLContext): Seq[Row] = {
// Since we need to return a Seq of rows, we will call getTables directly
// instead of calling tables in sqlContext.
val rows = sqlContext.catalog.getTables(databaseName).map {
val rows = sqlContext.sessionState.catalog.getTables(databaseName).map {
case (tableName, isTemporary) => Row(tableName, isTemporary)
}

Expand Down Expand Up @@ -417,7 +417,7 @@ case class DescribeFunction(
case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
sqlContext.catalog.setCurrentDatabase(databaseName)
sqlContext.sessionState.catalog.setCurrentDatabase(databaseName)
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ case class CreateTempTableUsing(
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
options = options)
sqlContext.catalog.registerTable(
sqlContext.sessionState.catalog.registerTable(
tableIdent,
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)

Expand All @@ -124,7 +124,7 @@ case class CreateTempTableUsingAsSelect(
bucketSpec = None,
options = options)
val result = dataSource.write(mode, df)
sqlContext.catalog.registerTable(
sqlContext.sessionState.catalog.registerTable(
tableIdent,
Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan)

Expand All @@ -137,11 +137,11 @@ case class RefreshTable(tableIdent: TableIdentifier)

override def run(sqlContext: SQLContext): Seq[Row] = {
// Refresh the given table's metadata first.
sqlContext.catalog.refreshTable(tableIdent)
sqlContext.sessionState.catalog.refreshTable(tableIdent)

// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent)
val logicalPlan = sqlContext.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
}

after {
sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
}

test("get all tables") {
Expand All @@ -45,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))

sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}

Expand All @@ -58,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))

sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
}
sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
}

test("overwriting") {
Expand All @@ -61,7 +61,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
}
sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
}

test("self-join") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ class HiveContext private[hive](
@transient
protected[sql] override lazy val sessionState = new HiveSessionState(self)

protected[sql] override def catalog = sessionState.catalog

// The Hive UDF current_database() is foldable, will be evaluated by optimizer,
// but the optimizer can't access the SessionState of metadataHive.
sessionState.functionRegistry.registerFunction(
Expand Down Expand Up @@ -349,12 +347,12 @@ class HiveContext private[hive](
*/
def refreshTable(tableName: String): Unit = {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
catalog.refreshTable(tableIdent)
sessionState.catalog.refreshTable(tableIdent)
}

protected[hive] def invalidateTable(tableName: String): Unit = {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
catalog.invalidateTable(tableIdent)
sessionState.catalog.invalidateTable(tableIdent)
}

/**
Expand All @@ -368,7 +366,7 @@ class HiveContext private[hive](
*/
def analyze(tableName: String) {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
val relation = EliminateSubqueryAliases(catalog.lookupRelation(tableIdent))
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))

relation match {
case relation: MetastoreRelation =>
Expand Down Expand Up @@ -429,7 +427,7 @@ class HiveContext private[hive](
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
catalog.client.alterTable(
sessionState.catalog.client.alterTable(
relation.table.copy(
properties = relation.table.properties +
(StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ case class CreateTableAsSelect(
withFormat
}

hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false)
hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false)

// Get the Metastore Relation
hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match {
case r: MetastoreRelation => r
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (hiveContext.catalog.tableExists(tableIdentifier)) {
if (hiveContext.sessionState.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ private[hive] case class CreateViewAsSelect(
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]

hiveContext.catalog.tableExists(tableIdentifier) match {
hiveContext.sessionState.catalog.tableExists(tableIdentifier) match {
case true if allowExisting =>
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.

case true if orReplace =>
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
hiveContext.catalog.client.alertView(prepareTable(sqlContext))
hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext))

case true =>
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
Expand All @@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect(
"CREATE OR REPLACE VIEW AS")

case false =>
hiveContext.catalog.client.createView(prepareTable(sqlContext))
hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext))
}

Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class InsertIntoHiveTable(

@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val catalog = sc.catalog
@transient private lazy val catalog = sc.sessionState.catalog

def output: Seq[Attribute] = Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class DropTable(
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(TableIdentifier(tableName))
hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName))
Seq.empty[Row]
}
}
Expand Down Expand Up @@ -130,7 +130,7 @@ case class CreateMetastoreDataSource(
val tableName = tableIdent.unquotedString
val hiveContext = sqlContext.asInstanceOf[HiveContext]

if (hiveContext.catalog.tableExists(tableIdent)) {
if (hiveContext.sessionState.catalog.tableExists(tableIdent)) {
if (allowExisting) {
return Seq.empty[Row]
} else {
Expand All @@ -142,7 +142,7 @@ case class CreateMetastoreDataSource(
val optionsWithPath =
if (!options.contains("path") && managedIfNoPath) {
isExternal = false
options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent))
options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}
Expand All @@ -155,7 +155,7 @@ case class CreateMetastoreDataSource(
bucketSpec = None,
options = optionsWithPath).resolveRelation()

hiveContext.catalog.createDataSourceTable(
hiveContext.sessionState.catalog.createDataSourceTable(
tableIdent,
userSpecifiedSchema,
Array.empty[String],
Expand Down Expand Up @@ -200,13 +200,13 @@ case class CreateMetastoreDataSourceAsSelect(
val optionsWithPath =
if (!options.contains("path")) {
isExternal = false
options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent))
options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}

var existingSchema = None: Option[StructType]
if (sqlContext.catalog.tableExists(tableIdent)) {
if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
Expand All @@ -230,7 +230,8 @@ case class CreateMetastoreDataSourceAsSelect(
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).

EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match {
EliminateSubqueryAliases(
sqlContext.sessionState.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
existingSchema = Some(l.schema)
case o =>
Expand Down Expand Up @@ -267,7 +268,7 @@ case class CreateMetastoreDataSourceAsSelect(
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
hiveContext.catalog.createDataSourceTable(
hiveContext.sessionState.catalog.createDataSourceTable(
tableIdent,
Some(result.schema),
partitionColumns,
Expand All @@ -278,7 +279,7 @@ case class CreateMetastoreDataSourceAsSelect(
}

// Refresh the cache of the table in the catalog.
hiveContext.catalog.refreshTable(tableIdent)
hiveContext.sessionState.catalog.refreshTable(tableIdent)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {

cacheManager.clearCache()
loadedTables.clear()
catalog.cachedDataSourceTables.invalidateAll()
catalog.client.reset()
catalog.unregisterAllTables()
sessionState.catalog.cachedDataSourceTables.invalidateAll()
sessionState.catalog.client.reset()
sessionState.catalog.unregisterAllTables()

FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void setUp() throws IOException {
if (path.exists()) {
path.delete();
}
hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath(
hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
if (fs.exists(hiveManagedPath)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}

val hiveTable = catalog.client.getTable("default", "t")
val hiveTable = sessionState.catalog.client.getTable("default", "t")
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
Expand Down Expand Up @@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}

val hiveTable = catalog.client.getTable("default", "t")
val hiveTable = sessionState.catalog.client.getTable("default", "t")
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
Expand Down Expand Up @@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
|AS SELECT 1 AS d1, "val_1" AS d2
""".stripMargin)

val hiveTable = catalog.client.getTable("default", "t")
val hiveTable = sessionState.catalog.client.getTable("default", "t")
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
Expand Down
Loading

0 comments on commit c5fd485

Please sign in to comment.