Skip to content

Commit

Permalink
[SPARK-31709][SQL] Proper base path for location when it is a relativ…
Browse files Browse the repository at this point in the history
…e path
  • Loading branch information
yaooqinn committed May 14, 2020
1 parent 3772154 commit 93c19e4
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,20 @@ class SessionCatalog(
"you cannot create a database with this name.")
}
validateName(dbName)
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
dbDefinition.copy(name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)),
ignoreIfExists)
}

private def makeQualifiedDBPath(locationUri: URI): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val fullPath = new Path(conf.warehousePath, CatalogUtils.URIToString(locationUri))
makeQualifiedPath(fullPath.toUri)
}
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = formatDatabaseName(db)
if (dbName == DEFAULT_DATABASE) {
Expand All @@ -231,7 +239,8 @@ class SessionCatalog(
def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
val dbName = formatDatabaseName(dbDefinition.name)
requireDbExists(dbName)
externalCatalog.alterDatabase(dbDefinition.copy(name = dbName))
externalCatalog.alterDatabase(dbDefinition.copy(
name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)))
}

def getDatabaseMetadata(db: String): CatalogDatabase = {
Expand Down Expand Up @@ -273,8 +282,7 @@ class SessionCatalog(
* by users.
*/
def getDefaultDBPath(db: String): URI = {
val database = formatDatabaseName(db)
new Path(new Path(conf.warehousePath), database + ".db").toUri
CatalogUtils.stringToURI(formatDatabaseName(db) + ".db")
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -306,8 +314,7 @@ class SessionCatalog(
val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get)
val qualifiedTableLocation = makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = tableIdentifier)
Expand Down Expand Up @@ -340,6 +347,16 @@ class SessionCatalog(
}
}

private def makeQualifiedTablePath(locationUri: URI, database: String): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val dbName = formatDatabaseName(database)
val dbLocation = makeQualifiedDBPath(getDatabaseMetadata(dbName).locationUri)
new Path(new Path(dbLocation), CatalogUtils.URIToString(locationUri)).toUri
}
}

/**
* Alter the metadata of an existing metastore table identified by `tableDefinition`.
*
Expand All @@ -359,7 +376,7 @@ class SessionCatalog(
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get)
makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = tableIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,18 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
val comment = properties.get(TableCatalog.PROP_COMMENT)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
val storage = if (location.isDefined) {
catalogTable.storage.copy(locationUri = location)
} else {
catalogTable.storage
}

try {
catalog.alterTable(
catalogTable
.copy(properties = properties, schema = schema, owner = owner, comment = comment))
catalogTable.copy(
properties = properties, schema = schema, owner = owner, comment = comment,
storage = storage))
} catch {
case _: NoSuchTableException =>
throw new NoSuchTableException(ident)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

package org.apache.spark.sql.execution.datasources.v2

import java.net.URI
import java.util
import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -160,6 +162,36 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(catalog.tableExists(testIdent))
}

private def makeQualifiedPathWithWarehouse(path: String): URI = {
val p = new Path(spark.sessionState.conf.warehousePath, path)
val fs = p.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(p).toUri

}

test("createTable: location") {
val catalog = newCatalog()
val properties = new util.HashMap[String, String]()
assert(!catalog.tableExists(testIdent))

// default location
val t1 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
catalog.dropTable(testIdent)

// relative path
properties.put(TableCatalog.PROP_LOCATION, "relative/path")
val t2 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))
catalog.dropTable(testIdent)

// absolute path
properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
val t3 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:/absolute/path")
}

test("tableExists") {
val catalog = newCatalog()

Expand Down Expand Up @@ -640,6 +672,26 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(exc.message.contains("not found"))
}

test("alterTable: location") {
val catalog = newCatalog()
assert(!catalog.tableExists(testIdent))

// default location
val t1 = catalog.createTable(testIdent, schema, Array.empty, emptyProps).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))

// relative path
val t2 = catalog.alterTable(testIdent,
TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")).asInstanceOf[V1Table]
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))

// absolute path
val t3 = catalog.alterTable(testIdent,
TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:/absolute/path")
}

test("dropTable") {
val catalog = newCatalog()

Expand Down Expand Up @@ -812,11 +864,16 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

test("createNamespace: basic behavior") {
val catalog = newCatalog()
val expectedPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString

val sessionCatalog = sqlContext.sessionState.catalog
spark.sessionState.conf.warehousePath
val expectedPath =
new Path(spark.sessionState.conf.warehousePath,
sessionCatalog.getDefaultDBPath(testNs(0)).toString).toString

catalog.createNamespace(testNs, Map("property" -> "value").asJava)

assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(catalog.namespaceExists(testNs) === true)
val metadata = catalog.loadNamespaceMetadata(testNs).asScala
Expand All @@ -842,6 +899,23 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.dropNamespace(testNs)
}

test("createNamespace: relative location") {
val catalog = newCatalog()
val expectedPath =
new Path(spark.sessionState.conf.warehousePath, "a/b/c").toString

catalog.createNamespace(testNs, Map("location" -> "a/b/c").asJava)

assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(catalog.namespaceExists(testNs) === true)
val metadata = catalog.loadNamespaceMetadata(testNs).asScala
checkMetadata(metadata, Map.empty)
assert(expectedPath === metadata("location"))

catalog.dropNamespace(testNs)
}

test("createNamespace: fail if namespace already exists") {
val catalog = newCatalog()

Expand Down Expand Up @@ -954,16 +1028,23 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

test("alterNamespace: update namespace location") {
val catalog = newCatalog()
val initialPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString
val newPath = "file:/tmp/db.db"
val initialPath =
new Path(spark.sessionState.conf.warehousePath,
spark.sessionState.catalog.getDefaultDBPath(testNs(0)).toString).toString

val newAbsoluteUri = "file:/tmp/db.db"
catalog.createNamespace(testNs, emptyProps)
assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri)
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newAbsoluteUri))
assert(newAbsoluteUri === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)

catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newPath))
val newAbsolutePath = "/tmp/newAbsolutePath"
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newAbsolutePath))
assert("file:" + newAbsolutePath === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(newPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
val newRelativePath = new Path(spark.sessionState.conf.warehousePath, "relativeP").toString
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", "relativeP"))
assert(newRelativePath === spark.catalog.getDatabase(testNs(0)).locationUri)

catalog.dropNamespace(testNs)
}
Expand Down

0 comments on commit 93c19e4

Please sign in to comment.