Skip to content

Commit

Permalink
[SPARK-12968][SQL] Implement command to set current database
Browse files Browse the repository at this point in the history
JIRA: https://issues.apache.org/jira/browse/SPARK-12968

Implement command to set current database.

Author: Liang-Chi Hsieh <viirya@gmail.com>
Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #10916 from viirya/ddl-use-database.
  • Loading branch information
viirya authored and rxin committed Jan 29, 2016
1 parent b9dfdcc commit 66449b8
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ trait Catalog {

def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan

def setCurrentDatabase(databaseName: String): Unit = {
throw new UnsupportedOperationException
}

/**
* Returns tuples of (tableName, isTemporary) for all tables in the given database.
* isTemporary is a Boolean value indicates if a table is a temporary or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
ExplainCommand(nodeToPlan(query), extended = extended.isDefined)

case Token("TOK_SWITCHDATABASE", Token(database, Nil) :: Nil) =>
SetDatabaseCommand(cleanIdentifier(database))

case Token("TOK_DESCTABLE", describeArgs) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
val Some(tableType) :: formatted :: extended :: pretty :: Nil =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,13 @@ case class DescribeFunction(
}
}
}

case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
sqlContext.catalog.setCurrentDatabase(databaseName)
Seq.empty[Row]
}

override val output: Seq[Attribute] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"CREATE DATABASE hive_test_db;"
-> "OK",
"USE hive_test_db;"
-> "OK",
-> "",
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
"SHOW TABLES;"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,10 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}

override def unregisterAllTables(): Unit = {}

override def setCurrentDatabase(databaseName: String): Unit = {
client.setCurrentDatabase(databaseName)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
"TOK_SHOWLOCKS",
"TOK_SHOWPARTITIONS",

"TOK_SWITCHDATABASE",

"TOK_UNLOCKTABLE"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ private[hive] trait ClientInterface {
/** Returns the name of the active database. */
def currentDatabase: String

/** Sets the name of current database. */
def setCurrentDatabase(databaseName: String): Unit

/** Returns the metadata for specified database, throwing an exception if it doesn't exist */
def getDatabase(name: String): HiveDatabase = {
getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -229,6 +230,14 @@ private[hive] class ClientWrapper(
state.getCurrentDatabase
}

override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
if (getDatabaseOption(databaseName).isDefined) {
state.setCurrentDatabase(databaseName)
} else {
throw new NoSuchDatabaseException
}
}

override def createDatabase(database: HiveDatabase): Unit = withHiveState {
client.createDatabase(
new Database(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin
Expand Down Expand Up @@ -1262,6 +1263,21 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {

}

test("use database") {
val currentDatabase = sql("select current_database()").first().getString(0)

sql("CREATE DATABASE hive_test_db")
sql("USE hive_test_db")
assert("hive_test_db" == sql("select current_database()").first().getString(0))

intercept[NoSuchDatabaseException] {
sql("USE not_existing_db")
}

sql(s"USE $currentDatabase")
assert(currentDatabase == sql("select current_database()").first().getString(0))
}

test("lookup hive UDF in another thread") {
val e = intercept[AnalysisException] {
range(1).selectExpr("not_a_udf()")
Expand Down

0 comments on commit 66449b8

Please sign in to comment.