Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31709][SQL] Proper base path for database/table location when it is a relative path #28527

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,20 @@ class SessionCatalog(
"you cannot create a database with this name.")
}
validateName(dbName)
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
dbDefinition.copy(name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)),
ignoreIfExists)
}

private def makeQualifiedDBPath(locationUri: URI): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val fullPath = new Path(conf.warehousePath, CatalogUtils.URIToString(locationUri))
makeQualifiedPath(fullPath.toUri)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
}
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = formatDatabaseName(db)
if (dbName == DEFAULT_DATABASE) {
Expand All @@ -231,7 +239,8 @@ class SessionCatalog(
def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
val dbName = formatDatabaseName(dbDefinition.name)
requireDbExists(dbName)
externalCatalog.alterDatabase(dbDefinition.copy(name = dbName))
externalCatalog.alterDatabase(dbDefinition.copy(
name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update the location uri here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have ALTER (DATABASE|SCHEMA) database_name SET LOCATION path syntax that works for hive 3.x

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see.

}

def getDatabaseMetadata(db: String): CatalogDatabase = {
Expand Down Expand Up @@ -273,8 +282,7 @@ class SessionCatalog(
* by users.
*/
def getDefaultDBPath(db: String): URI = {
val database = formatDatabaseName(db)
new Path(new Path(conf.warehousePath), database + ".db").toUri
CatalogUtils.stringToURI(formatDatabaseName(db) + ".db")
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -307,7 +315,7 @@ class SessionCatalog(
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get)
makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = tableIdentifier)
Expand Down Expand Up @@ -340,6 +348,16 @@ class SessionCatalog(
}
}

private def makeQualifiedTablePath(locationUri: URI, database: String): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val dbName = formatDatabaseName(database)
val dbLocation = makeQualifiedDBPath(getDatabaseMetadata(dbName).locationUri)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about it as it adds an extra database lookup. Is it better to push this work to the underlying external catalog?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that we are calling requireDbExists(dbName) repeatedly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally createTable should only do one RPC to the hive metastore. requireDbExists is one problem but we can simply remove it. However, the new database lookup seems can't be easily removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, the new database lookup seems can't be easily removed.

Yes, it seem make no difference even we put this into external catalogs, we have to call the getDatabase API in order to get the actual path of the database for this particular case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we don't qualify the path here and leave it to hive metastore? Will it still be a relative path in the hive metastore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, #17254

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok we did the same thing for partition. LGTM then

new Path(new Path(dbLocation), CatalogUtils.URIToString(locationUri)).toUri
}
}

/**
* Alter the metadata of an existing metastore table identified by `tableDefinition`.
*
Expand All @@ -359,7 +377,7 @@ class SessionCatalog(
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get)
makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = tableIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,18 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
val comment = properties.get(TableCatalog.PROP_COMMENT)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
val storage = if (location.isDefined) {
catalogTable.storage.copy(locationUri = location)
} else {
catalogTable.storage
}

try {
catalog.alterTable(
catalogTable
.copy(properties = properties, schema = schema, owner = owner, comment = comment))
catalogTable.copy(
properties = properties, schema = schema, owner = owner, comment = comment,
storage = storage))
} catch {
case _: NoSuchTableException =>
throw new NoSuchTableException(ident)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

package org.apache.spark.sql.execution.datasources.v2

import java.net.URI
import java.util
import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -160,6 +162,36 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(catalog.tableExists(testIdent))
}

private def makeQualifiedPathWithWarehouse(path: String): URI = {
val p = new Path(spark.sessionState.conf.warehousePath, path)
val fs = p.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(p).toUri

}

test("createTable: location") {
val catalog = newCatalog()
val properties = new util.HashMap[String, String]()
assert(!catalog.tableExists(testIdent))

// default location
val t1 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
catalog.dropTable(testIdent)

// relative path
properties.put(TableCatalog.PROP_LOCATION, "relative/path")
val t2 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))
catalog.dropTable(testIdent)

// absolute path
properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
val t3 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:/absolute/path")
}

test("tableExists") {
val catalog = newCatalog()

Expand Down Expand Up @@ -640,6 +672,26 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(exc.message.contains("not found"))
}

test("alterTable: location") {
val catalog = newCatalog()
assert(!catalog.tableExists(testIdent))

// default location
val t1 = catalog.createTable(testIdent, schema, Array.empty, emptyProps).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))

// relative path
val t2 = catalog.alterTable(testIdent,
TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")).asInstanceOf[V1Table]
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))

// absolute path
val t3 = catalog.alterTable(testIdent,
TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:/absolute/path")
}

test("dropTable") {
val catalog = newCatalog()

Expand Down Expand Up @@ -812,11 +864,16 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

test("createNamespace: basic behavior") {
val catalog = newCatalog()
val expectedPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString

val sessionCatalog = sqlContext.sessionState.catalog
spark.sessionState.conf.warehousePath
val expectedPath =
new Path(spark.sessionState.conf.warehousePath,
sessionCatalog.getDefaultDBPath(testNs(0)).toString).toString

catalog.createNamespace(testNs, Map("property" -> "value").asJava)

assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(catalog.namespaceExists(testNs) === true)
val metadata = catalog.loadNamespaceMetadata(testNs).asScala
Expand All @@ -842,6 +899,23 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.dropNamespace(testNs)
}

test("createNamespace: relative location") {
val catalog = newCatalog()
val expectedPath =
new Path(spark.sessionState.conf.warehousePath, "a/b/c").toString

catalog.createNamespace(testNs, Map("location" -> "a/b/c").asJava)

assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(catalog.namespaceExists(testNs) === true)
val metadata = catalog.loadNamespaceMetadata(testNs).asScala
checkMetadata(metadata, Map.empty)
assert(expectedPath === metadata("location"))

catalog.dropNamespace(testNs)
}

test("createNamespace: fail if namespace already exists") {
val catalog = newCatalog()

Expand Down Expand Up @@ -954,16 +1028,23 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

test("alterNamespace: update namespace location") {
val catalog = newCatalog()
val initialPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString
val newPath = "file:/tmp/db.db"
val initialPath =
new Path(spark.sessionState.conf.warehousePath,
spark.sessionState.catalog.getDefaultDBPath(testNs(0)).toString).toString

val newAbsoluteUri = "file:/tmp/db.db"
catalog.createNamespace(testNs, emptyProps)
assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri)
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newAbsoluteUri))
assert(newAbsoluteUri === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)

catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newPath))
val newAbsolutePath = "/tmp/newAbsolutePath"
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newAbsolutePath))
assert("file:" + newAbsolutePath === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(newPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
val newRelativePath = new Path(spark.sessionState.conf.warehousePath, "relativeP").toString
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", "relativeP"))
assert(newRelativePath === spark.catalog.getDatabase(testNs(0)).locationUri)

catalog.dropNamespace(testNs)
}
Expand Down