diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 06ac37b7f83ed..779562a0dd9b0 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -266,8 +266,8 @@ createFileFormat ; fileFormat - : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat - | identifier #genericFileFormat + : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING #tableFileFormat + | identifier #genericFileFormat ; storageHandler diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 2966eefd07c77..fdf4718f30c86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -781,14 +781,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * * Expected format: * {{{ - * CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name - * [(col1 data_type [COMMENT col_comment], ...)] + * CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name + * [(col1[:] data_type [COMMENT col_comment], ...)] * [COMMENT table_comment] - * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)] - * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS] - * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]] + * [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)] * [ROW FORMAT row_format] - * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]] + * [STORED AS file_format] * [LOCATION path] * [TBLPROPERTIES (property_name=property_value, ...)] * [AS select_statement]; @@ -834,6 +832,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { compressed = false, serdeProperties = Map()) } + validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) .getOrElse(EmptyStorageFormat) val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) @@ -890,6 +889,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * Create a [[CatalogStorageFormat]] for creating tables. + * + * Format: STORED AS ... */ override def visitCreateFileFormat( ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { @@ -917,9 +918,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { EmptyStorageFormat.copy( inputFormat = Option(string(ctx.inFmt)), - outputFormat = Option(string(ctx.outFmt)), - serde = Option(ctx.serdeCls).map(string) - ) + outputFormat = Option(string(ctx.outFmt))) } /** @@ -1003,6 +1002,49 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { EmptyStorageFormat.copy(serdeProperties = entries.toMap) } + /** + * Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT + * and STORED AS. + * + * The following are allowed. Anything else is not: + * ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE] + * ROW FORMAT DELIMITED ... STORED AS TEXTFILE + * ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ... + */ + private def validateRowFormatFileFormat( + rowFormatCtx: RowFormatContext, + createFileFormatCtx: CreateFileFormatContext, + parentCtx: ParserRuleContext): Unit = { + if (rowFormatCtx == null || createFileFormatCtx == null) { + return + } + (rowFormatCtx, createFileFormatCtx.fileFormat) match { + case (_, ffTable: TableFileFormatContext) => // OK + case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) => + ffGeneric.identifier.getText.toLowerCase match { + case ("sequencefile" | "textfile" | "rcfile") => // OK + case fmt => + throw operationNotAllowed( + s"ROW FORMAT SERDE is incompatible with format '$fmt', which also specifies a serde", + parentCtx) + } + case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) => + ffGeneric.identifier.getText.toLowerCase match { + case "textfile" => // OK + case fmt => throw operationNotAllowed( + s"ROW FORMAT DELIMITED is only compatible with 'textfile', not '$fmt'", parentCtx) + } + case _ => + // should never happen + def str(ctx: ParserRuleContext): String = { + (0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ") + } + throw operationNotAllowed( + s"Unexpected combination of ${str(rowFormatCtx)} and ${str(createFileFormatCtx)}", + parentCtx) + } + } + /** * Create or replace a view. This creates a [[CreateViewCommand]] command. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 708b878c843a7..6a659615e7d5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import scala.reflect.{classTag, ClassTag} + import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource} import org.apache.spark.sql.catalyst.catalog.FunctionResourceType @@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + // TODO: merge this with DDLSuite (SPARK-14441) class DDLCommandSuite extends PlanTest { private val parser = new SparkSqlParser(new SQLConf) @@ -40,6 +43,15 @@ class DDLCommandSuite extends PlanTest { containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) } } + private def parseAs[T: ClassTag](query: String): T = { + parser.parsePlan(query) match { + case t: T => t + case other => + fail(s"Expected to parse ${classTag[T].runtimeClass} from query," + + s"got ${other.getClass.getName}: $query") + } + } + test("create database") { val sql = """ @@ -225,19 +237,69 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed4, expected4) } + test("create table - row format and table file format") { + val createTableStart = "CREATE TABLE my_tab ROW FORMAT" + val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'" + val query1 = s"$createTableStart SERDE 'anything' $fileFormat" + val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat" + + // No conflicting serdes here, OK + val parsed1 = parseAs[CreateTableCommand](query1) + assert(parsed1.table.storage.serde == Some("anything")) + assert(parsed1.table.storage.inputFormat == Some("inputfmt")) + assert(parsed1.table.storage.outputFormat == Some("outputfmt")) + val parsed2 = parseAs[CreateTableCommand](query2) + assert(parsed2.table.storage.serde.isEmpty) + assert(parsed2.table.storage.inputFormat == Some("inputfmt")) + assert(parsed2.table.storage.outputFormat == Some("outputfmt")) + } + + test("create table - row format serde and generic file format") { + val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") + val supportedSources = Set("sequencefile", "rcfile", "textfile") + + allSources.foreach { s => + val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s" + if (supportedSources.contains(s)) { + val ct = parseAs[CreateTableCommand](query) + val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) + assert(hiveSerde.isDefined) + assert(ct.table.storage.serde == Some("anything")) + assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + } else { + assertUnsupported(query, Seq("row format serde", "incompatible", s)) + } + } + } + + test("create table - row format delimited and generic file format") { + val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") + val supportedSources = Set("textfile") + + allSources.foreach { s => + val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s" + if (supportedSources.contains(s)) { + val ct = parseAs[CreateTableCommand](query) + val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) + assert(hiveSerde.isDefined) + assert(ct.table.storage.serde == hiveSerde.get.serde) + assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + } else { + assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s)) + } + } + } + test("create external table - location must be specified") { assertUnsupported( sql = "CREATE EXTERNAL TABLE my_tab", containsThesePhrases = Seq("create external table", "location")) val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'" - parser.parsePlan(query) match { - case ct: CreateTableCommand => - assert(ct.table.tableType == CatalogTableType.EXTERNAL) - assert(ct.table.storage.locationUri == Some("/something/anything")) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") - } + val ct = parseAs[CreateTableCommand](query) + assert(ct.table.tableType == CatalogTableType.EXTERNAL) + assert(ct.table.storage.locationUri == Some("/something/anything")) } test("create table - property values must be set") { @@ -252,14 +314,9 @@ class DDLCommandSuite extends PlanTest { test("create table - location implies external") { val query = "CREATE TABLE my_tab LOCATION '/something/anything'" - parser.parsePlan(query) match { - case ct: CreateTableCommand => - assert(ct.table.tableType == CatalogTableType.EXTERNAL) - assert(ct.table.storage.locationUri == Some("/something/anything")) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") - } + val ct = parseAs[CreateTableCommand](query) + assert(ct.table.tableType == CatalogTableType.EXTERNAL) + assert(ct.table.storage.locationUri == Some("/something/anything")) } test("create table using - with partitioned by") { @@ -551,8 +608,7 @@ class DDLCommandSuite extends PlanTest { test("alter table: set file format (not allowed)") { assertUnsupported( - "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " + - "OUTPUTFORMAT 'test' SERDE 'test'") + "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' OUTPUTFORMAT 'test'") assertUnsupported( "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " + "SET FILEFORMAT PARQUET") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 30ad392969b4e..96c8fa6b70501 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -61,7 +61,7 @@ class HiveDDLCommandSuite extends PlanTest { |country STRING COMMENT 'country of origination') |COMMENT 'This is the staging page view table' |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day') - |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE + |STORED AS RCFILE |LOCATION '/user/external/page_view' |TBLPROPERTIES ('p1'='v1', 'p2'='v2') |AS SELECT * FROM src""".stripMargin @@ -88,8 +88,6 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.partitionColumns == CatalogColumn("dt", "string", comment = Some("date type")) :: CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) - assert(desc.storage.serdeProperties == - Map((serdeConstants.SERIALIZATION_FORMAT, "\u002C"), (serdeConstants.FIELD_DELIM, "\u002C"))) assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) assert(desc.storage.serde ==