Skip to content

Commit

Permalink
[SPARK-39543] The option of DataFrameWriterV2 should be passed to sto…
Browse files Browse the repository at this point in the history
…rage properties if fallback to v1

The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1, to support something such as compressed formats

example:

`spark.range(0, 100).writeTo("t1").option("compression", "zstd").using("parquet").create`

**before**

gen: part-00000-644a65ed-0e7a-43d5-8d30-b610a0fb19dc-c000.**snappy**.parquet ...

**after**

gen: part-00000-6eb9d1ae-8fdb-4428-aea3-bd6553954cdd-c000.**zstd**.parquet ...

No

new test

Closes #36941 from Yikf/writeV2option.

Authored-by: Yikf <yikaifei1@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e5b7fb8)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
yikf authored and cloud-fan committed Jun 23, 2022
1 parent 22dae38 commit 725ce33
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}

case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, writeOptions, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.provider, c.options, c.location, c.serde, ctas = true)
c.provider,
c.options ++ writeOptions,
c.location,
c.serde,
ctas = true)

if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
c.partitioning, c.bucketSpec, c.properties, provider, c.location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import scala.collection.JavaConverters._

import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.connector.InMemoryV1Provider
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, TableCatalog}
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.FakeSourceOne
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
Expand Down Expand Up @@ -531,6 +534,23 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
}

test("SPARK-39543 writeOption should be passed to storage properties when fallback to v1") {
val provider = classOf[InMemoryV1Provider].getName

withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, provider)) {
spark.range(10)
.writeTo("table_name")
.option("compression", "zstd").option("name", "table_name")
.using(provider)
.create()
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("table_name"))

assert(table.identifier === TableIdentifier("table_name", Some("default")))
assert(table.storage.properties.contains("compression"))
assert(table.storage.properties.getOrElse("compression", "foo") == "zstd")
}
}

test("Replace: basic behavior") {
spark.sql(
"CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)")
Expand Down

0 comments on commit 725ce33

Please sign in to comment.