Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30098][SQL] Add a configuration to use default datasource as provider for CREATE TABLE command #30554

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2922,6 +2922,15 @@ object SQLConf {
.stringConf
.createWithDefault("")

val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT =
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
buildConf("spark.sql.legacy.createHiveTableByDefault")
.internal()
.doc("When set to true, CREATE TABLE syntax without USING or STORED AS will use Hive " +
s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key} as the table provider.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a "legacy" key mention how long folks can depend on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are already a lot of legacy configurations, and there's no plan for how long we'll keep it. That's what I know as far as I have followed in the community. It would be a separate issue to decide lifetime of legacy configurations but ideally the legacy configurations will be removed in the major release bumpup I guess. I remember I discussed this with Sean as well somewhere. cc @srowen FYI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a general policy for legacy configs that means this remains the same until Spark 4 by default I guess there is no need to document it here (I don't remember that conversation but I was out for a few months last/this year).

.version("3.1.0")
.booleanConf
.createWithDefault(true)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}

/**
Expand Down Expand Up @@ -636,11 +636,16 @@ class ResolveSessionCatalog(
(storageFormat, DDLUtils.HIVE_PROVIDER)
} else {
// If neither USING nor STORED AS/ROW FORMAT is specified, we create native data source
// tables if it's a CTAS and `conf.convertCTAS` is true.
// TODO: create native data source table by default for non-CTAS.
if (ctas && conf.convertCTAS) {
// tables if:
// 1. `LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT` is false, or
// 2. It's a CTAS and `conf.convertCTAS` is true.
val createHiveTableByDefault = conf.getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT)
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
if (!createHiveTableByDefault || (ctas && conf.convertCTAS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this mark convertCTAS as deprecated since it is superseded by the new config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I can do it in a follow-up.

(nonHiveStorageFormat, conf.defaultDataSourceName)
} else {
logWarning("A Hive serde table will be created as there is no table provider " +
s"specified. You can set ${SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key} to false " +
"so that native data source table will be created instead.")
(defaultHiveStorage, DDLUtils.HIVE_PROVIDER)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,23 @@ class DataSourceV2SQLSuite
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
}

// TODO: ignored by SPARK-31707, restore the test after create table syntax unification
ignore("CreateTable: without USING clause") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
val testCatalog = catalog("testcat").asTableCatalog

sql("CREATE TABLE testcat.t1 (id int)")
val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1"))
// Spark shouldn't set the default provider for catalog plugins.
assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER))

sql("CREATE TABLE t2 (id int)")
val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
.loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table]
// Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog.
assert(t2.v1Table.provider == Some(conf.defaultDataSourceName))
test("CreateTable: without USING clause") {
withSQLConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key -> "false") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
val testCatalog = catalog("testcat").asTableCatalog

sql("CREATE TABLE testcat.t1 (id int)")
val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1"))
// Spark shouldn't set the default provider for catalog plugins.
assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER))

sql("CREATE TABLE t2 (id int)")
val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
.loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table]
// Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog.
assert(t2.v1Table.provider == Some(conf.defaultDataSourceName))
}
}

test("CreateTable/RepalceTable: invalid schema if has interval type") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,7 @@ class PlanResolutionSuite extends AnalysisTest {
.add("b", StringType)
)
)
compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) STORED AS textfile " +
"PARTITIONED BY (c INT, d STRING COMMENT 'test2')",
createTable(
table = "my_tab",
Expand Down Expand Up @@ -1616,7 +1616,7 @@ class PlanResolutionSuite extends AnalysisTest {
)
// Partitioned by a StructType should be accepted by `SparkSqlParser` but will fail an analyze
// rule in `AnalyzeCreateTable`.
compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) STORED AS textfile " +
"PARTITIONED BY (nested STRUCT<col1: STRING,col2: INT>)",
createTable(
table = "my_tab",
Expand Down Expand Up @@ -1890,7 +1890,7 @@ class PlanResolutionSuite extends AnalysisTest {
}

test("Test CTAS #3") {
val s3 = """CREATE TABLE page_view AS SELECT * FROM src"""
val s3 = """CREATE TABLE page_view STORED AS textfile AS SELECT * FROM src"""
val (desc, exists) = extractTableDesc(s3)
assert(exists == false)
assert(desc.identifier.database == Some("default"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone
private val originalCreateHiveTable =
TestHive.conf.getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT)

def testCases: Seq[(String, File)] = {
hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
Expand All @@ -59,6 +61,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// Fix session local timezone to America/Los_Angeles for those timezone sensitive tests
// (timestamp_*)
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles")
TestHive.setConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT, true)
RuleExecutor.resetMetrics()
}

Expand All @@ -69,6 +72,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled)
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone)
TestHive.setConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT, originalCreateHiveTable)

// For debugging dump some statistics about how much time was spent in various optimizer rules
logWarning(RuleExecutor.dumpTimeSpent())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,26 @@ import org.apache.spark.sql.{AnalysisException, ShowCreateTableSuite}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}

class HiveShowCreateTableSuite extends ShowCreateTableSuite with TestHiveSingleton {

private var origCreateHiveTableConfig = false

protected override def beforeAll(): Unit = {
super.beforeAll()
origCreateHiveTableConfig =
spark.conf.get(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT)
spark.conf.set(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key, true)
}

protected override def afterAll(): Unit = {
spark.conf.set(
SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key,
origCreateHiveTableConfig)
super.afterAll()
}

test("view") {
Seq(true, false).foreach { serde =>
withView("v1") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
test("Test partition mode = strict") {
withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
withTable("partitioned") {
sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
sql("CREATE TABLE partitioned (id bigint, data string) USING hive " +
"PARTITIONED BY (part string)")
val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
.toDF("id", "data", "part")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
testData.createOrReplaceTempView("testData")

// create the table for test
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
sql(s"CREATE TABLE table_with_partition(key int,value string) USING hive " +
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
"SELECT key,value FROM testData")
Expand Down Expand Up @@ -81,7 +81,8 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl

test("SPARK-21739: Cast expression should initialize timezoneId") {
withTable("table_with_timestamp_partition") {
sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED BY (ts TIMESTAMP)")
sql("CREATE TABLE table_with_timestamp_partition(value int) USING hive " +
"PARTITIONED BY (ts TIMESTAMP)")
sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " +
"PARTITION (ts = '2010-01-01 00:00:00.000') VALUES (1)")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
// Partitioned table
val partTable = "part_table"
withTable(partTable) {
sql(s"CREATE TABLE $partTable (key STRING, value STRING) PARTITIONED BY (ds STRING)")
sql(s"CREATE TABLE $partTable (key STRING, value STRING) USING hive " +
"PARTITIONED BY (ds STRING)")
sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-01') SELECT * FROM src")
sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-02') SELECT * FROM src")
sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-03') SELECT * FROM src")
Expand All @@ -191,7 +192,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION.key -> "True") {
val checkSizeTable = "checkSizeTable"
withTable(checkSizeTable) {
sql(s"CREATE TABLE $checkSizeTable (key STRING, value STRING) PARTITIONED BY (ds STRING)")
sql(s"CREATE TABLE $checkSizeTable (key STRING, value STRING) USING hive " +
"PARTITIONED BY (ds STRING)")
sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-01') SELECT * FROM src")
sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-02') SELECT * FROM src")
sql(s"INSERT INTO TABLE $checkSizeTable PARTITION (ds='2010-01-03') SELECT * FROM src")
Expand Down Expand Up @@ -274,7 +276,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
test("SPARK-22745 - read Hive's statistics for partition") {
val tableName = "hive_stats_part_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " +
"PARTITIONED BY (ds STRING)")
sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2017-01-01') SELECT * FROM src")
var partition = spark.sessionState.catalog
.getPartition(TableIdentifier(tableName), Map("ds" -> "2017-01-01"))
Expand All @@ -296,7 +299,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val tableName = "analyzeTable_part"
withTable(tableName) {
withTempPath { path =>
sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " +
"PARTITIONED BY (ds STRING)")

val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
partitionDates.foreach { ds =>
Expand All @@ -321,6 +325,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(
s"""
|CREATE TABLE $sourceTableName (key STRING, value STRING)
|USING hive
|PARTITIONED BY (ds STRING)
|LOCATION '${path.toURI}'
""".stripMargin)
Expand All @@ -338,6 +343,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(
s"""
|CREATE TABLE $tableName (key STRING, value STRING)
|USING hive
|PARTITIONED BY (ds STRING)
|LOCATION '${path.toURI}'
""".stripMargin)
Expand Down Expand Up @@ -371,7 +377,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}

withTable(tableName) {
sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " +
"PARTITIONED BY (ds STRING)")

createPartition("2010-01-01", "SELECT '1', 'A' from src")
createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
Expand Down Expand Up @@ -424,7 +431,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}

withTable(tableName) {
sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)")
sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " +
"PARTITIONED BY (ds STRING, hr INT)")

createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
Expand Down Expand Up @@ -472,7 +480,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}

withTable(tableName) {
sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)")
sql(s"CREATE TABLE $tableName (key STRING, value STRING) USING hive " +
"PARTITIONED BY (ds STRING, hr INT)")

createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
Expand Down Expand Up @@ -961,7 +970,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) {
withTable(table) {
sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)")
sql(s"CREATE TABLE $table (i INT, j STRING) USING hive " +
"PARTITIONED BY (ds STRING, hr STRING)")
// table has two partitions initially
for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) {
sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'")
Expand Down Expand Up @@ -1034,6 +1044,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(
s"""
|CREATE TABLE $managedTable (key INT, value STRING)
|USING hive
|PARTITIONED BY (ds STRING, hr STRING)
""".stripMargin)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
versionSpark.sql(
"""
|CREATE TABLE tbl(c1 string)
|USING hive
|PARTITIONED BY (ds STRING)
""".stripMargin)
versionSpark.sql("INSERT OVERWRITE TABLE tbl partition (ds='2') SELECT '1'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ class HiveDDLSuite
}

test("alter table partition - storage information") {
sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width INT)")
sql("CREATE TABLE boxes (height INT, length INT) STORED AS textfile PARTITIONED BY (width INT)")
sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4")
val catalog = spark.sessionState.catalog
val expectedSerde = "com.sparkbricks.serde.ColumnarSerDe"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,16 @@ class HiveSerDeSuite extends HiveComparisonTest with PlanTest with BeforeAndAfte
test("Test the default fileformat for Hive-serde tables") {
withSQLConf("hive.default.fileformat" -> "orc") {
val (desc, exists) = extractTableDesc(
"CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
"CREATE TABLE IF NOT EXISTS fileformat_test (id int) USING hive")
assert(exists)
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
assert(desc.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
}

withSQLConf("hive.default.fileformat" -> "parquet") {
val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
val (desc, exists) = extractTableDesc(
"CREATE TABLE IF NOT EXISTS fileformat_test (id int) USING hive")
assert(exists)
val input = desc.storage.inputFormat
val output = desc.storage.outputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
sql(
s"""
|CREATE TABLE $table(id string)
|USING hive
|PARTITIONED BY (p1 string,p2 string,p3 string,p4 string,p5 string)
""".stripMargin)
sql(
Expand Down Expand Up @@ -157,6 +158,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
sql(
s"""
|CREATE TABLE $table(id string)
|USING hive
|PARTITIONED BY (p1 string,p2 string,p3 string,p4 string,p5 string)
""".stripMargin)
sql(
Expand All @@ -182,6 +184,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
sql(
s"""
|CREATE TABLE $table (id int)
|USING hive
|PARTITIONED BY (a int, b int)
""".stripMargin)
val scan1 = getHiveTableScanExec(s"SELECT * FROM $table WHERE a = 1 AND b = 2")
Expand Down Expand Up @@ -252,7 +255,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
test("SPARK-32069: Improve error message on reading unexpected directory") {
withTable("t") {
withTempDir { f =>
sql(s"CREATE TABLE t(i LONG) LOCATION '${f.getAbsolutePath}'")
sql(s"CREATE TABLE t(i LONG) USING hive LOCATION '${f.getAbsolutePath}'")
sql("INSERT INTO t VALUES(1)")
val dir = new File(f.getCanonicalPath + "/data")
dir.mkdir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2026,6 +2026,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
sql(
"""
|CREATE TABLE part_table (c STRING)
|STORED AS textfile
|PARTITIONED BY (d STRING)
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$path/part-r-000011' " +
Expand Down
Loading