Skip to content

Commit

Permalink
[SPARK-30098][SQL] Use default datasource as provider for CREATE TABL…
Browse files Browse the repository at this point in the history
…E syntax

### What changes were proposed in this pull request?

In this PR, we propose to use the value of `spark.sql.source.default` as the provider for `CREATE TABLE` syntax instead of `hive` in Spark 3.0.

And to help the migration, we introduce a legacy conf `spark.sql.legacy.respectHiveDefaultProvider.enabled` and set its default to `false`.

### Why are the changes needed?

1. Currently, `CREATE TABLE` syntax use hive provider to create table while `DataFrameWriter.saveAsTable` API using the value of `spark.sql.source.default` as a provider to create table. It would be better to make them consistent.

2. User may gets confused in some cases. For example:

```
CREATE TABLE t1 (c1 INT) USING PARQUET;
CREATE TABLE t2 (c1 INT);
```

In these two DDLs, use may think that `t2` should also use parquet as default provider since Spark always advertise parquet as the default format. However, it's hive in this case.

On the other hand, if we omit the USING clause in a CTAS statement, we do pick parquet by default if `spark.sql.hive.convertCATS=true`:

```
CREATE TABLE t3 USING PARQUET AS SELECT 1 AS VALUE;
CREATE TABLE t4 AS SELECT 1 AS VALUE;
```
And these two cases together can be really confusing.

3. Now, Spark SQL is very independent and popular. We do not need to be fully consistent with Hive's behavior.

### Does this PR introduce any user-facing change?

Yes, before this PR, using `CREATE TABLE` syntax will use hive provider. But now, it use the value of `spark.sql.source.default` as its provider.

### How was this patch tested?

Added tests in `DDLParserSuite` and `HiveDDlSuite`.

Closes #26736 from Ngone51/dev-create-table-using-parquet-by-default.

Lead-authored-by: wuyi <yi.wu@databricks.com>
Co-authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
Ngone51 authored and cloud-fan committed Dec 6, 2019
1 parent c1a5f94 commit 58be82a
Show file tree
Hide file tree
Showing 25 changed files with 261 additions and 186 deletions.
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Expand Up @@ -253,6 +253,8 @@ license: |
</td> </td>
</tr> </tr>
</table> </table>

- Since Spark 3.0, CREATE TABLE without a specific provider will use the value of `spark.sql.sources.default` as its provider. In Spark version 2.4 and earlier, it was hive. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`.


- Since Spark 3.0, the unary arithmetic operator plus(`+`) only accepts string, numeric and interval type values as inputs. Besides, `+` with a integral string representation will be coerced to double value, e.g. `+'1'` results `1.0`. In Spark version 2.4 and earlier, this operator is ignored. There is no type checking for it, thus, all type values with a `+` prefix are valid, e.g. `+ array(1, 2)` is valid and results `[1, 2]`. Besides, there is no type coercion for it at all, e.g. in Spark 2.4, the result of `+'1'` is string `1`. - Since Spark 3.0, the unary arithmetic operator plus(`+`) only accepts string, numeric and interval type values as inputs. Besides, `+` with a integral string representation will be coerced to double value, e.g. `+'1'` results `1.0`. In Spark version 2.4 and earlier, this operator is ignored. There is no type checking for it, thus, all type values with a `+` prefix are valid, e.g. `+ array(1, 2)` is valid and results `[1, 2]`. Besides, there is no type coercion for it at all, e.g. in Spark 2.4, the result of `+'1'` is string `1`.


Expand Down
Expand Up @@ -29,6 +29,12 @@ grammar SqlBase;
*/ */
public boolean legacy_exponent_literal_as_decimal_enabled = false; public boolean legacy_exponent_literal_as_decimal_enabled = false;
/**
* When false, CREATE TABLE syntax without a provider will use
* the value of spark.sql.sources.default as its provider.
*/
public boolean legacy_create_hive_table_by_default_enabled = false;
/** /**
* Verify whether current token is a valid decimal token (which contains dot). * Verify whether current token is a valid decimal token (which contains dot).
* Returns true if the character that follows the token is not a digit or letter or underscore. * Returns true if the character that follows the token is not a digit or letter or underscore.
Expand Down Expand Up @@ -101,13 +107,13 @@ statement
(RESTRICT | CASCADE)? #dropNamespace (RESTRICT | CASCADE)? #dropNamespace
| SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)? | SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)?
(LIKE? pattern=STRING)? #showNamespaces (LIKE? pattern=STRING)? #showNamespaces
| createTableHeader ('(' colTypeList ')')? tableProvider | {!legacy_create_hive_table_by_default_enabled}?
((OPTIONS options=tablePropertyList) | createTableHeader ('(' colTypeList ')')? tableProvider?
(PARTITIONED BY partitioning=transformList) | createTableClauses
bucketSpec | (AS? query)? #createTable
locationSpec | | {legacy_create_hive_table_by_default_enabled}?
(COMMENT comment=STRING) | createTableHeader ('(' colTypeList ')')? tableProvider
(TBLPROPERTIES tableProps=tablePropertyList))* createTableClauses
(AS? query)? #createTable (AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')? | createTableHeader ('(' columns=colTypeList ')')?
((COMMENT comment=STRING) | ((COMMENT comment=STRING) |
Expand All @@ -128,12 +134,7 @@ statement
locationSpec | locationSpec |
(TBLPROPERTIES tableProps=tablePropertyList))* #createTableLike (TBLPROPERTIES tableProps=tablePropertyList))* #createTableLike
| replaceTableHeader ('(' colTypeList ')')? tableProvider | replaceTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) | createTableClauses
(PARTITIONED BY partitioning=transformList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #replaceTable (AS? query)? #replaceTable
| ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS | ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze (identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
Expand Down Expand Up @@ -352,6 +353,15 @@ tableProvider
: USING multipartIdentifier : USING multipartIdentifier
; ;


createTableClauses
:((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitioning=transformList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
;

tablePropertyList tablePropertyList
: '(' tableProperty (',' tableProperty)* ')' : '(' tableProperty (',' tableProperty)* ')'
; ;
Expand Down
Expand Up @@ -2379,6 +2379,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
*/ */
type TableHeader = (Seq[String], Boolean, Boolean, Boolean) type TableHeader = (Seq[String], Boolean, Boolean, Boolean)


/**
* Type to keep track of table clauses:
* (partitioning, bucketSpec, options, locationSpec, properties, comment).
*/
type TableClauses = (Seq[Transform], Option[BucketSpec], Map[String, String],
Map[String, String], Option[String], Option[String])

/** /**
* Validate a create table statement and return the [[TableIdentifier]]. * Validate a create table statement and return the [[TableIdentifier]].
*/ */
Expand Down Expand Up @@ -2614,6 +2621,24 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
ctx.EXTENDED != null) ctx.EXTENDED != null)
} }


override def visitCreateTableClauses(ctx: CreateTableClausesContext): TableClauses = {
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val partitioning: Seq[Transform] =
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val comment = Option(ctx.comment).map(string)
(partitioning, bucketSpec, properties, options, location, comment)
}

/** /**
* Create a table, returning a [[CreateTableStatement]] logical plan. * Create a table, returning a [[CreateTableStatement]] logical plan.
* *
Expand All @@ -2639,26 +2664,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) { if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) operationNotAllowed("CREATE EXTERNAL TABLE ...", ctx)
} }

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val schema = Option(ctx.colTypeList()).map(createSchema) val schema = Option(ctx.colTypeList()).map(createSchema)
val partitioning: Seq[Transform] = val defaultProvider = conf.defaultDataSourceName
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil) val provider =
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec) Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(defaultProvider)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) val (partitioning, bucketSpec, properties, options, location, comment) =
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) visitCreateTableClauses(ctx.createTableClauses())

val provider = ctx.tableProvider.multipartIdentifier.getText
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val comment = Option(ctx.comment).map(string)


Option(ctx.query).map(plan) match { Option(ctx.query).map(plan) match {
case Some(_) if temp => case Some(_) if temp =>
Expand Down Expand Up @@ -2713,23 +2726,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
operationNotAllowed("REPLACE EXTERNAL TABLE ... USING", ctx) operationNotAllowed("REPLACE EXTERNAL TABLE ... USING", ctx)
} }


checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) val (partitioning, bucketSpec, properties, options, location, comment) =
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx) visitCreateTableClauses(ctx.createTableClauses())
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val schema = Option(ctx.colTypeList()).map(createSchema) val schema = Option(ctx.colTypeList()).map(createSchema)
val partitioning: Seq[Transform] =
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)

val provider = ctx.tableProvider.multipartIdentifier.getText val provider = ctx.tableProvider.multipartIdentifier.getText
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val comment = Option(ctx.comment).map(string)
val orCreate = ctx.replaceTableHeader().CREATE() != null val orCreate = ctx.replaceTableHeader().CREATE() != null


Option(ctx.query).map(plan) match { Option(ctx.query).map(plan) match {
Expand Down
Expand Up @@ -101,6 +101,7 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
lexer.addErrorListener(ParseErrorListener) lexer.addErrorListener(ParseErrorListener)
lexer.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced lexer.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
lexer.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled lexer.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
lexer.legacy_create_hive_table_by_default_enabled = conf.createHiveTableByDefaultEnabled
lexer.SQL_standard_keyword_behavior = SQLStandardKeywordBehavior lexer.SQL_standard_keyword_behavior = SQLStandardKeywordBehavior


val tokenStream = new CommonTokenStream(lexer) val tokenStream = new CommonTokenStream(lexer)
Expand All @@ -110,6 +111,7 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
parser.addErrorListener(ParseErrorListener) parser.addErrorListener(ParseErrorListener)
parser.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced parser.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
parser.legacy_create_hive_table_by_default_enabled = conf.createHiveTableByDefaultEnabled
parser.SQL_standard_keyword_behavior = SQLStandardKeywordBehavior parser.SQL_standard_keyword_behavior = SQLStandardKeywordBehavior


try { try {
Expand Down
Expand Up @@ -1966,6 +1966,15 @@ object SQLConf {
.booleanConf .booleanConf
.createWithDefault(false) .createWithDefault(false)


val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED =
buildConf("spark.sql.legacy.createHiveTableByDefault.enabled")
.internal()
.doc("When set to true, CREATE TABLE syntax without a provider will use hive " +
s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key}.")
.booleanConf
.createWithDefault(false)


val LEGACY_INTEGRALDIVIDE_RETURN_LONG = buildConf("spark.sql.legacy.integralDivide.returnBigint") val LEGACY_INTEGRALDIVIDE_RETURN_LONG = buildConf("spark.sql.legacy.integralDivide.returnBigint")
.doc("If it is set to true, the div operator returns always a bigint. This behavior was " + .doc("If it is set to true, the div operator returns always a bigint. This behavior was " +
"inherited from Hive. Otherwise, the return type is the data type of the operands.") "inherited from Hive. Otherwise, the return type is the data type of the operands.")
Expand Down Expand Up @@ -2583,6 +2592,9 @@ class SQLConf extends Serializable with Logging {
def exponentLiteralAsDecimalEnabled: Boolean = def exponentLiteralAsDecimalEnabled: Boolean =
getConf(SQLConf.LEGACY_EXPONENT_LITERAL_AS_DECIMAL_ENABLED) getConf(SQLConf.LEGACY_EXPONENT_LITERAL_AS_DECIMAL_ENABLED)


def createHiveTableByDefaultEnabled: Boolean =
getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED)

def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG) def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)


def nameNonStructGroupingKeyAsValue: Boolean = def nameNonStructGroupingKeyAsValue: Boolean =
Expand Down
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.unsafe.types.UTF8String


Expand All @@ -48,6 +49,26 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsePlan(sql), expected, checkAnalysis = false) comparePlans(parsePlan(sql), expected, checkAnalysis = false)
} }


test("SPARK-30098: create table without provider should " +
"use default data source under non-legacy mode") {
val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)"
val defaultProvider = conf.defaultDataSourceName
val expectedPlan = CreateTableStatement(
Seq("my_tab"),
new StructType()
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType),
Seq.empty[Transform],
None,
Map.empty[String, String],
defaultProvider,
Map.empty[String, String],
None,
None,
false)
parseCompare(createSql, expectedPlan)
}

test("create/replace table using - schema") { test("create/replace table using - schema") {
val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
Expand Down
Expand Up @@ -187,22 +187,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
if (external) { if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
} }

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

if (ifNotExists) { if (ifNotExists) {
// Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support // Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support
// IF NOT EXISTS. Users are not allowed to replace the existing temp table. // IF NOT EXISTS. Users are not allowed to replace the existing temp table.
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
} }


val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) val (_, _, _, options, _, _) = visitCreateTableClauses(ctx.createTableClauses())
val provider = ctx.tableProvider.multipartIdentifier.getText val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(
throw new ParseException("CREATE TEMPORARY TABLE without a provider is not allowed.", ctx))
val schema = Option(ctx.colTypeList()).map(createSchema) val schema = Option(ctx.colTypeList()).map(createSchema)


logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " + logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
Expand Down
Expand Up @@ -46,8 +46,7 @@ CREATE TABLE view_base_table (key int /* PRIMARY KEY */, data varchar(20))
-- !query 4 schema -- !query 4 schema
struct<> struct<>
-- !query 4 output -- !query 4 output
org.apache.spark.sql.AnalysisException
Hive support is required to CREATE Hive TABLE (AS SELECT);




-- !query 5 -- !query 5
Expand All @@ -57,7 +56,7 @@ CREATE VIEW key_dependent_view AS
struct<> struct<>
-- !query 5 output -- !query 5 output
org.apache.spark.sql.AnalysisException org.apache.spark.sql.AnalysisException
Table or view not found: view_base_table; line 2 pos 17 expression 'default.view_base_table.`data`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;




-- !query 6 -- !query 6
Expand Down
Expand Up @@ -159,7 +159,7 @@ class SparkSqlParserSuite extends AnalysisTest {
} }


test("create table - schema") { test("create table - schema") {
assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)", assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) STORED AS textfile",
createTable( createTable(
table = "my_tab", table = "my_tab",
schema = (new StructType) schema = (new StructType)
Expand All @@ -179,7 +179,8 @@ class SparkSqlParserSuite extends AnalysisTest {
partitionColumnNames = Seq("c", "d") partitionColumnNames = Seq("c", "d")
) )
) )
assertEqual("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>)", assertEqual("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>) " +
"STORED AS textfile",
createTable( createTable(
table = "my_tab", table = "my_tab",
schema = (new StructType) schema = (new StructType)
Expand Down

0 comments on commit 58be82a

Please sign in to comment.