Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19667][SQL]create table with hiveenabled in default database use warehouse path instead of the location of default database #17001

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
aebdfc6
[SPARK-19667][SQL]create table with hiveenabled in default database u…
windpiger Feb 20, 2017
825c0ad
rename a conf name
windpiger Feb 20, 2017
a2c9168
fix test faile
windpiger Feb 21, 2017
bacd528
process default database location when create/get database from metas…
windpiger Feb 22, 2017
3f6e061
remove an redundant line
windpiger Feb 22, 2017
96dcc7d
fix empty string location of database
windpiger Feb 22, 2017
f329387
modify the test case
windpiger Feb 22, 2017
83dba73
Merge branch 'master' into defaultDBPathInHive
windpiger Feb 22, 2017
58a0020
fix test failed
windpiger Feb 22, 2017
1dce2d7
add log to find out why jenkins failed
windpiger Feb 22, 2017
12f81d3
add scalastyle:off for println
windpiger Feb 22, 2017
56e83d5
fix test faile
windpiger Feb 22, 2017
901bb1c
make warehouse path qualified for default database
windpiger Feb 23, 2017
99d9746
remove a string s
windpiger Feb 23, 2017
db555e3
modify a comment
windpiger Feb 23, 2017
d327994
fix test failed
windpiger Feb 23, 2017
73c8802
move to sessioncatalog
windpiger Feb 23, 2017
747b31a
remove import
windpiger Feb 23, 2017
8f8063f
remove an import
windpiger Feb 23, 2017
4dc11c1
modify some codestyle and some comment
windpiger Feb 24, 2017
9c0773b
Merge branch 'defaultDBPathInHive' of github.com:windpiger/spark into…
windpiger Feb 24, 2017
80b8133
mv defaultdb path logic to ExternalCatalog
windpiger Feb 27, 2017
41ea115
modify a comment
windpiger Feb 27, 2017
13245e4
modify a comment
windpiger Feb 27, 2017
096ae63
add final def
windpiger Mar 1, 2017
badd61b
modify some code
windpiger Mar 2, 2017
35d2b59
add lazy flag
windpiger Mar 2, 2017
e3a467e
modify test case
windpiger Mar 3, 2017
ae9938a
modify test case
windpiger Mar 3, 2017
7739ccd
mv getdatabase
windpiger Mar 3, 2017
f93f5d3
merge with master
windpiger Mar 8, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.catalyst.catalog

import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression

Expand All @@ -30,9 +33,20 @@ import org.apache.spark.sql.catalyst.expressions.Expression
*
* Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
*/
abstract class ExternalCatalog {
abstract class ExternalCatalog(conf: SparkConf, hadoopConf: Configuration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we just pass in a defaultDB: CatalogDatabase? then we don't need to add the protected def warehousePath: String

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think conf/hadoopConf is more useful, later logic can use it. and it's subclass also has these two conf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still have conf/hadoopConf in InMemoryCatalog and HiveExternalCatalog, we can just add one more parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we pass a defaultDB, it seems like we introduce an instance of defaultDB as we discussed above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it will be only used in getDatabase, and we can save a metastore call to get the default database.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~ let me fix it~

Copy link
Contributor Author

@windpiger windpiger Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I found it that if we add a parameter defaultDB for ExternalCatalog and its subclass InMemoryCatalog and HiveExternalCatalog, this change will cause a lot of related code to be modified, such as test cases ,and other logic where create InMemoryCatalog and HiveExternalCatalog

For example:

currently all the parameters of InMemoryCatalog have its own default value

class InMemoryCatalog(conf: SparkConf = new SparkConf,hadoopConfig: Configuration = new Configuration)

we can create it without an parameters, but if we add a defaultDB, we should new a defaultDB in the parameter, while we can not create a legal deafultDB because we can not get the warehouse path for the defaultDB like this:

class InMemoryCatalog(conf: SparkConf = new SparkConf,hadoopConfig: Configuration = new Configuration, defaultDB: CatalogDatabase = CatalogDatabase("default","","${can not get the warehouse path}",Map.empty))

if we don't provide a default value for defautDB in the parameter, this will cause more code change which I think it is not proper.

what about we keep the provided def warehousePath in ExternalCatalog, and add a
lazy val defaultDB = { val qualifiedWarehousePath = SessionCatalog .makeQualifiedPath(warehousePath, hadoopConf).toString CatalogDatabase("default","", qualifiedWarehousePath, Map.empty) }

this can also avoid call getDatabase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modify the code by adding

lazy val defaultDB = { val qualifiedWarehousePath = SessionCatalog .makeQualifiedPath(warehousePath, hadoopConf).toString CatalogDatabase("default","", qualifiedWarehousePath, Map.empty) }

in ExternalCatalog

if it is not ok ,I will revert it, thanks~

import CatalogTypes.TablePartitionSpec

lazy val defaultDB: CatalogDatabase = {
val qualifiedWarehousePath = SessionCatalog
.makeQualifiedPath(warehousePath, hadoopConf).toString
CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
"The default database created by Spark using current warehouse path",
qualifiedWarehousePath,
Map.empty
)
}

protected def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) {
throw new NoSuchDatabaseException(db)
Expand Down Expand Up @@ -74,7 +88,17 @@ abstract class ExternalCatalog {
*/
def alterDatabase(dbDefinition: CatalogDatabase): Unit

def getDatabase(db: String): CatalogDatabase
final def getDatabase(db: String): CatalogDatabase = {
val database = getDatabaseInternal(db)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put this in the else branch

// The default database's location always uses the warehouse path.
// Since the location of database stored in metastore is qualified,
// we also make the warehouse location qualified.
if (db == SessionCatalog.DEFAULT_DATABASE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we totally make default database a virtual concept? i.e. we never create it or ask metastore to retrieve it, just keep an instance of CatalogDatabase for default database, and return it in ExternalCatalog.getDatabase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we create an default instance of CatalogDatabase, we will add more logic to other interfaces, such as databaseExists ,we need to add the logic that if it is default database, we always return true even if there is no default database in hive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

defaultDB
} else {
database
}
}

def databaseExists(db: String): Boolean

Expand Down Expand Up @@ -269,4 +293,7 @@ abstract class ExternalCatalog {

def listFunctions(db: String, pattern: String): Seq[String]

protected def getDatabaseInternal(db: String): CatalogDatabase

protected def warehousePath: String
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils
class InMemoryCatalog(
conf: SparkConf = new SparkConf,
hadoopConfig: Configuration = new Configuration)
extends ExternalCatalog {
extends ExternalCatalog(conf, hadoopConfig) {

import CatalogTypes.TablePartitionSpec

Expand Down Expand Up @@ -93,6 +93,9 @@ class InMemoryCatalog(
}
}

// For InMemoryCatalog, default database location is equal to warehouse path
protected override def warehousePath: String =
catalog(SessionCatalog.DEFAULT_DATABASE).db.locationUri
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -156,7 +159,7 @@ class InMemoryCatalog(
catalog(dbDefinition.name).db = dbDefinition
}

override def getDatabase(db: String): CatalogDatabase = synchronized {
protected override def getDatabaseInternal(db: String): CatalogDatabase = synchronized {
requireDbExists(db)
catalog(db).db
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ import org.apache.spark.sql.catalyst.util.StringUtils

object SessionCatalog {
val DEFAULT_DATABASE = "default"

/**
* This method is used to make the given path qualified before we
* store this path in the underlying external catalog. So, when a path
* does not contain a scheme, this path will not be changed after the default
* FileSystem is changed.
*/
def makeQualifiedPath(path: String, conf: Configuration): Path = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(conf)
fs.makeQualified(hadoopPath)
}
}

/**
Expand Down Expand Up @@ -125,18 +137,6 @@ class SessionCatalog(
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
}

/**
* This method is used to make the given path qualified before we
* store this path in the underlying external catalog. So, when a path
* does not contain a scheme, this path will not be changed after the default
* FileSystem is changed.
*/
private def makeQualifiedPath(path: String): Path = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
fs.makeQualified(hadoopPath)
}

private def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) {
throw new NoSuchDatabaseException(db)
Expand Down Expand Up @@ -170,7 +170,7 @@ class SessionCatalog(
"you cannot create a database with this name.")
}
validateName(dbName)
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri, hadoopConf).toString
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
ignoreIfExists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
* All public methods must be synchronized for thread-safety.
*/
private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)
extends ExternalCatalog with Logging {
extends ExternalCatalog(conf, hadoopConf) with Logging {

import CatalogTypes.TablePartitionSpec
import HiveExternalCatalog._
Expand Down Expand Up @@ -129,6 +129,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

protected override def warehousePath: String = conf.get(WAREHOUSE_PATH)
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -162,7 +163,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.alterDatabase(dbDefinition)
}

override def getDatabase(db: String): CatalogDatabase = withClient {
protected override def getDatabaseInternal(db: String): CatalogDatabase = withClient {
client.getDatabase(db)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,35 @@ class HiveSparkSubmitSuite
runSparkSubmit(argsForShowTables)
}

test("SPARK-19667: create table in default database with HiveEnabled use warehouse path " +
"instead of the location of default database") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val warehousePath1 = Utils.createTempDir("wh1")
val argsForCreateTable = Seq(
"--class", SPARK_19667_CREATE_TABLE.getClass.getName.stripSuffix("$"),
"--name", "SPARK-19667",
"--master", "local-cluster[2,1,1024]",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", s"spark.sql.warehouse.dir=$warehousePath1",
unusedJar.toString)
runSparkSubmit(argsForCreateTable)

val warehousePath2 = Utils.createTempDir("wh2")
val argsForShowTables = Seq(
"--class", SPARK_19667_VERIFY_TABLE_PATH.getClass.getName.stripSuffix("$"),
"--name", "SPARK-19667",
"--master", "local-cluster[2,1,1024]",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", s"spark.sql.warehouse.dir=$warehousePath2",
unusedJar.toString)
runSparkSubmit(argsForShowTables)

Utils.deleteRecursively(warehousePath1)
Utils.deleteRecursively(warehousePath2)
}

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
Expand Down Expand Up @@ -905,3 +934,91 @@ object SPARK_18989_DESC_TABLE {
}
}
}

object SPARK_19667_CREATE_TABLE {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
try {
val warehousePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}"
val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default")
// default database use warehouse path as its location
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default -> The default
use warehouse path -> uses the warehouse path

assert(defaultDB.locationUri.stripSuffix("/") == warehousePath)
spark.sql("CREATE TABLE t(a string)")

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
// table in default database use the location of default database which is also warehouse path
assert(table.location.stripSuffix("/") == s"$warehousePath/t")
spark.sql("INSERT INTO TABLE t SELECT 1")
assert(spark.sql("SELECT * FROM t").count == 1)

spark.sql("CREATE DATABASE not_default")
spark.sql("USE not_default")
spark.sql("CREATE TABLE t1(b string)")
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
// table in not default database use the location of its own database
assert(table1.location.stripSuffix("/") == s"$warehousePath/not_default.db/t1")
} finally {
spark.sql("USE default")
}
}
}

object SPARK_19667_VERIFY_TABLE_PATH {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
try {
val warehousePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am doing this modify

val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default")
// default database use warehouse path as its location
assert(defaultDB.locationUri.stripSuffix("/") == warehousePath)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
// the table in default database created in job(SPARK_19667_CREATE_TABLE) above,
// which has different warehouse path from this job, its location still equals to
// the location when it's created.
Copy link
Member

@gatorsmile gatorsmile Mar 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

For the table created by another job in the default database, the location of this table is not changed, even if the current job has a different warehouse path.

assert(table.location.stripSuffix("/") != s"$warehousePath/t")
assert(spark.sql("SELECT * FROM t").count == 1)

spark.sql("CREATE TABLE t3(d string)")
val table3 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3"))
// the table in default database created here in this job, it will use the warehouse path
// of this job as its location
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

->

When a job creates a table in the default database, the table location is under the warehouse path
that is configured for the local job.

assert(table3.location.stripSuffix("/") == s"$warehousePath/t3")

spark.sql("USE not_default")
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
// the table in not default database create in job(SPARK_19667_CREATE_TABLE) above,
// which has different warehouse path from this job, its location still equals to
// the location when it's created.
assert(table1.location.stripSuffix("/") != s"$warehousePath/not_default.db/t1")
assert(!new File(s"$warehousePath/not_default.db/t1").exists())

spark.sql("CREATE TABLE t2(c string)")
val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
// the table in not default database created here in this job, it will use the location
// of the database as its location, not the warehouse path in this job
Copy link
Member

@gatorsmile gatorsmile Mar 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

->

The table created in the non-default database (created in another job) is under the database location.

assert(table2.location.stripSuffix("/") != s"$warehousePath/not_default.db/t2")

spark.sql("CREATE DATABASE not_default_1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> non_default_db1

spark.sql("USE not_default_1")
spark.sql("CREATE TABLE t4(e string)")
val table4 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t4"))
// the table created in the database which created in this job, it will use the location
// of the database.
Copy link
Member

@gatorsmile gatorsmile Mar 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

->

The table created in the non-default database (created in this job) is under the database location.

assert(table4.location.stripSuffix("/") == s"$warehousePath/not_default_1.db/t4")

} finally {
spark.sql("DROP TABLE IF EXISTS t4")
spark.sql("DROP DATABASE not_default_1")

spark.sql("USE not_default")
spark.sql("DROP TABLE IF EXISTS t1")
spark.sql("DROP TABLE IF EXISTS t2")
spark.sql("DROP DATABASE not_default")

spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS t")
spark.sql("DROP TABLE IF EXISTS t3")
}
}
}