Skip to content

Commit

Permalink
[SPARK-37546][SQL] V2 ReplaceTableAsSelect command should qualify loc…
Browse files Browse the repository at this point in the history
…ation

### What changes were proposed in this pull request?
Currently, v2 RTAS command doesn't qualify the location:

```
spark.sql("REPLACE TABLE testcat.t USING foo LOCATION '/tmp/foo' AS SELECT id FROM source")
spark.sql("DESCRIBE EXTENDED testcat.t").filter("col_name = 'Location'").show

+--------+-------------+-------+
|col_name|    data_type|comment|
+--------+-------------+-------+
|Location|/tmp/foo     |       |
+--------+-------------+-------+
```
, whereas v1 command qualifies the location as file:/tmp/foo which is the correct behavior since the default filesystem can change for different sessions.

### Why are the changes needed?
This PR proposes to store the qualified location in order to prevent the issue where default filesystem changes for different sessions.

### Does this PR introduce _any_ user-facing change?
Yes, now, v2 RTAS command will store qualified location:

```
+--------+-------------+-------+
|col_name|    data_type|comment|
+--------+-------------+-------+
|Location|file:/tmp/foo|       |
+--------+-------------+-------+
```

### How was this patch tested?
new test

Closes #34807 from huaxingao/location.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
huaxingao authored and cloud-fan committed Dec 6, 2021
1 parent 0b959b5 commit bde47c8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
Expand Up @@ -99,6 +99,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
location, session.sharedState.hadoopConf)
}

private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
tableSpec.copy(
location = tableSpec.location.map(makeQualifiedDBObjectPath(_)))
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, DataSourceV2ScanRelation(
_, V1ScanWrapper(scan, pushed, pushedDownOperators), output)) =>
Expand Down Expand Up @@ -165,38 +170,32 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat

case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning,
tableSpec, ifNotExists) =>
val qualifiedLocation = tableSpec.location.map(makeQualifiedDBObjectPath(_))
CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema,
partitioning, tableSpec.copy(location = qualifiedLocation), ifNotExists) :: Nil
partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil

case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, query, tableSpec,
options, ifNotExists) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
val tableSpecWithQualifiedLocation = tableSpec.copy(
location = tableSpec.location.map(makeQualifiedDBObjectPath(_)))
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, query, planLater(query),
tableSpecWithQualifiedLocation, writeOptions, ifNotExists) :: Nil
qualifyLocInTableSpec(tableSpec), writeOptions, ifNotExists) :: Nil
case _ =>
CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, parts, query,
planLater(query), tableSpecWithQualifiedLocation, writeOptions, ifNotExists) :: Nil
planLater(query), qualifyLocInTableSpec(tableSpec), writeOptions, ifNotExists) :: Nil
}

case RefreshTable(r: ResolvedTable) =>
RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil

case ReplaceTable(ResolvedDBObjectName(catalog, ident), schema, parts, tableSpec, orCreate) =>
val qualifiedLocation = tableSpec.location.map(makeQualifiedDBObjectPath(_))
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableExec(staging, ident.asIdentifier, schema, parts,
tableSpec.copy(location = qualifiedLocation),
orCreate = orCreate, invalidateCache) :: Nil
qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
case _ =>
ReplaceTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, parts,
tableSpec.copy(location = qualifiedLocation), orCreate = orCreate,
invalidateCache) :: Nil
qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
}

case ReplaceTableAsSelect(ResolvedDBObjectName(catalog, ident),
Expand All @@ -210,7 +209,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
parts,
query,
planLater(query),
tableSpec,
qualifyLocInTableSpec(tableSpec),
writeOptions,
orCreate = orCreate,
invalidateCache) :: Nil
Expand All @@ -221,7 +220,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
parts,
query,
planLater(query),
tableSpec,
qualifyLocInTableSpec(tableSpec),
writeOptions,
orCreate = orCreate,
invalidateCache) :: Nil
Expand Down
Expand Up @@ -423,6 +423,23 @@ class DataSourceV2SQLSuite
}
}

test("SPARK-37546: ReplaceTableAsSelect should store location as qualified") {
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Seq(basicIdentifier, atomicIdentifier).foreach { identifier =>
withTable(identifier) {
spark.sql(s"CREATE TABLE $identifier USING foo LOCATION '/tmp/foo' " +
"AS SELECT id, data FROM source")
spark.sql(s"REPLACE TABLE $identifier USING foo LOCATION '/tmp/foo' " +
"AS SELECT id FROM source")
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head.getString(0)
assert(location === "file:/tmp/foo")
}
}
}

test("ReplaceTableAsSelect: basic v2 implementation.") {
val basicCatalog = catalog("testcat").asTableCatalog
val atomicCatalog = catalog("testcat_atomic").asTableCatalog
Expand Down

0 comments on commit bde47c8

Please sign in to comment.