diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ec0c34d4c7960..6fba3156c3919 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -219,12 +219,20 @@ class SessionCatalog( "you cannot create a database with this name.") } validateName(dbName) - val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri) externalCatalog.createDatabase( - dbDefinition.copy(name = dbName, locationUri = qualifiedPath), + dbDefinition.copy(name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)), ignoreIfExists) } + private def makeQualifiedDBPath(locationUri: URI): URI = { + if (locationUri.isAbsolute) { + locationUri + } else { + val fullPath = new Path(conf.warehousePath, CatalogUtils.URIToString(locationUri)) + makeQualifiedPath(fullPath.toUri) + } + } + def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { val dbName = formatDatabaseName(db) if (dbName == DEFAULT_DATABASE) { @@ -241,7 +249,8 @@ class SessionCatalog( def alterDatabase(dbDefinition: CatalogDatabase): Unit = { val dbName = formatDatabaseName(dbDefinition.name) requireDbExists(dbName) - externalCatalog.alterDatabase(dbDefinition.copy(name = dbName)) + externalCatalog.alterDatabase(dbDefinition.copy( + name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri))) } def getDatabaseMetadata(db: String): CatalogDatabase = { @@ -283,8 +292,7 @@ class SessionCatalog( * by users. */ def getDefaultDBPath(db: String): URI = { - val database = formatDatabaseName(db) - new Path(new Path(conf.warehousePath), database + ".db").toUri + CatalogUtils.stringToURI(formatDatabaseName(db) + ".db") } // ---------------------------------------------------------------------------- @@ -317,7 +325,7 @@ class SessionCatalog( && !tableDefinition.storage.locationUri.get.isAbsolute) { // make the location of the table qualified. val qualifiedTableLocation = - makeQualifiedPath(tableDefinition.storage.locationUri.get) + makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db) tableDefinition.copy( storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)), identifier = tableIdentifier) @@ -350,6 +358,16 @@ class SessionCatalog( } } + private def makeQualifiedTablePath(locationUri: URI, database: String): URI = { + if (locationUri.isAbsolute) { + locationUri + } else { + val dbName = formatDatabaseName(database) + val dbLocation = makeQualifiedDBPath(getDatabaseMetadata(dbName).locationUri) + new Path(new Path(dbLocation), CatalogUtils.URIToString(locationUri)).toUri + } + } + /** * Alter the metadata of an existing metastore table identified by `tableDefinition`. * @@ -369,7 +387,7 @@ class SessionCatalog( && !tableDefinition.storage.locationUri.get.isAbsolute) { // make the location of the table qualified. val qualifiedTableLocation = - makeQualifiedPath(tableDefinition.storage.locationUri.get) + makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db) tableDefinition.copy( storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)), identifier = tableIdentifier) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index df3f231f7d0ef..6dda1d4aaf37e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -126,11 +126,18 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes) val comment = properties.get(TableCatalog.PROP_COMMENT) val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner) + val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI) + val storage = if (location.isDefined) { + catalogTable.storage.copy(locationUri = location) + } else { + catalogTable.storage + } try { catalog.alterTable( - catalogTable - .copy(properties = properties, schema = schema, owner = owner, comment = comment)) + catalogTable.copy( + properties = properties, schema = schema, owner = owner, comment = comment, + storage = storage)) } catch { case _: NoSuchTableException => throw new NoSuchTableException(ident) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index c399a011f9073..c3bcf86c1ed27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -17,17 +17,19 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.net.URI import java.util import java.util.Collections import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -160,6 +162,36 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(catalog.tableExists(testIdent)) } + private def makeQualifiedPathWithWarehouse(path: String): URI = { + val p = new Path(spark.sessionState.conf.warehousePath, path) + val fs = p.getFileSystem(spark.sessionState.newHadoopConf()) + fs.makeQualified(p).toUri + + } + + test("createTable: location") { + val catalog = newCatalog() + val properties = new util.HashMap[String, String]() + assert(!catalog.tableExists(testIdent)) + + // default location + val t1 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table] + assert(t1.catalogTable.location === + spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) + catalog.dropTable(testIdent) + + // relative path + properties.put(TableCatalog.PROP_LOCATION, "relative/path") + val t2 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table] + assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path")) + catalog.dropTable(testIdent) + + // absolute path + properties.put(TableCatalog.PROP_LOCATION, "/absolute/path") + val t3 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table] + assert(t3.catalogTable.location.toString === "file:/absolute/path") + } + test("tableExists") { val catalog = newCatalog() @@ -640,6 +672,26 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(exc.message.contains("not found")) } + test("alterTable: location") { + val catalog = newCatalog() + assert(!catalog.tableExists(testIdent)) + + // default location + val t1 = catalog.createTable(testIdent, schema, Array.empty, emptyProps).asInstanceOf[V1Table] + assert(t1.catalogTable.location === + spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) + + // relative path + val t2 = catalog.alterTable(testIdent, + TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")).asInstanceOf[V1Table] + assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path")) + + // absolute path + val t3 = catalog.alterTable(testIdent, + TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")).asInstanceOf[V1Table] + assert(t3.catalogTable.location.toString === "file:/absolute/path") + } + test("dropTable") { val catalog = newCatalog() @@ -812,11 +864,15 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { test("createNamespace: basic behavior") { val catalog = newCatalog() - val expectedPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString + + val sessionCatalog = sqlContext.sessionState.catalog + val expectedPath = + new Path(spark.sessionState.conf.warehousePath, + sessionCatalog.getDefaultDBPath(testNs(0)).toString).toString catalog.createNamespace(testNs, Map("property" -> "value").asJava) - assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri) assert(catalog.namespaceExists(testNs) === true) val metadata = catalog.loadNamespaceMetadata(testNs).asScala @@ -842,6 +898,23 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.dropNamespace(testNs) } + test("createNamespace: relative location") { + val catalog = newCatalog() + val expectedPath = + new Path(spark.sessionState.conf.warehousePath, "a/b/c").toString + + catalog.createNamespace(testNs, Map("location" -> "a/b/c").asJava) + + assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri) + + assert(catalog.namespaceExists(testNs) === true) + val metadata = catalog.loadNamespaceMetadata(testNs).asScala + checkMetadata(metadata, Map.empty) + assert(expectedPath === metadata("location")) + + catalog.dropNamespace(testNs) + } + test("createNamespace: fail if namespace already exists") { val catalog = newCatalog() @@ -954,16 +1027,23 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { test("alterNamespace: update namespace location") { val catalog = newCatalog() - val initialPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString - val newPath = "file:/tmp/db.db" + val initialPath = + new Path(spark.sessionState.conf.warehousePath, + spark.sessionState.catalog.getDefaultDBPath(testNs(0)).toString).toString + val newAbsoluteUri = "file:/tmp/db.db" catalog.createNamespace(testNs, emptyProps) + assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri) + catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newAbsoluteUri)) + assert(newAbsoluteUri === spark.catalog.getDatabase(testNs(0)).locationUri) - assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) - - catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newPath)) + val newAbsolutePath = "/tmp/newAbsolutePath" + catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newAbsolutePath)) + assert("file:" + newAbsolutePath === spark.catalog.getDatabase(testNs(0)).locationUri) - assert(newPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + val newRelativePath = new Path(spark.sessionState.conf.warehousePath, "relativeP").toString + catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", "relativeP")) + assert(newRelativePath === spark.catalog.getDatabase(testNs(0)).locationUri) catalog.dropNamespace(testNs) }