Skip to content

Commit

Permalink
[DSCCOMPUTE-1795] 支持双集群版本,创建表时location不添加scheme信息时未正常继承库路径 (apache#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn authored and pan3793 committed Feb 9, 2022
1 parent f3b7ebc commit a1dc7c7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2062,7 +2062,7 @@
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<!-- 3.3.1 won't work with zinc; fails to find javac from java.home -->
<version>4.3.0</version>
<version>3.2.2</version>
<executions>
<execution>
<id>eclipse-add-source</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,20 @@ class SessionCatalog(
"you cannot create a database with this name.")
}
validateName(dbName)
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
dbDefinition.copy(name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)),
ignoreIfExists)
}

private def makeQualifiedDBPath(locationUri: URI): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val fullPath = new Path(conf.warehousePath, CatalogUtils.URIToString(locationUri))
makeQualifiedPath(fullPath.toUri)
}
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = formatDatabaseName(db)
if (dbName == DEFAULT_DATABASE) {
Expand All @@ -222,7 +230,8 @@ class SessionCatalog(
def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
val dbName = formatDatabaseName(dbDefinition.name)
requireDbExists(dbName)
externalCatalog.alterDatabase(dbDefinition.copy(name = dbName))
externalCatalog.alterDatabase(dbDefinition.copy(
name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)))
}

def getDatabaseMetadata(db: String): CatalogDatabase = {
Expand Down Expand Up @@ -264,8 +273,7 @@ class SessionCatalog(
* by users.
*/
def getDefaultDBPath(db: String): URI = {
val database = formatDatabaseName(db)
new Path(new Path(conf.warehousePath), database + ".db").toUri
CatalogUtils.stringToURI(formatDatabaseName(db) + ".db")
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -294,7 +302,7 @@ class SessionCatalog(
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get)
makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = TableIdentifier(table, Some(db)))
Expand All @@ -306,6 +314,16 @@ class SessionCatalog(
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}

private def makeQualifiedTablePath(locationUri: URI, database: String): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val dbName = formatDatabaseName(database)
val dbLocation = makeQualifiedDBPath(getDatabaseMetadata(dbName).locationUri)
new Path(new Path(dbLocation), CatalogUtils.URIToString(locationUri)).toUri
}
}

/**
* Alter the metadata of an existing metastore table identified by `tableDefinition`.
*
Expand All @@ -319,9 +337,19 @@ class SessionCatalog(
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
val tableIdentifier = TableIdentifier(table, Some(db))
val newTableDefinition = tableDefinition.copy(identifier = tableIdentifier)
requireDbExists(db)
requireTableExists(tableIdentifier)
val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = tableIdentifier)
} else {
tableDefinition.copy(identifier = tableIdentifier)
}
externalCatalog.alterTable(newTableDefinition)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
// if (isUsingHiveMetastore) {
// assert(storageFormat.properties.get("path") === expected)
// }
assert(storageFormat.locationUri === Some(expected))
assert(storageFormat.locationUri ===
Some(makeQualifiedPath(CatalogUtils.URIToString(expected))))
}
// set table location
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
Expand Down

0 comments on commit a1dc7c7

Please sign in to comment.