From 796adc3407094b1a1e7adfc10690d3164e9453be Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 11 May 2016 16:53:39 -0700 Subject: [PATCH 1/8] Throw exception on conflicting SerDes --- .../spark/sql/execution/SparkSqlParser.scala | 48 ++++++++++++++++++- .../execution/command/DDLCommandSuite.scala | 18 +++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a51665f838e3a..b287df1bee85e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -776,6 +776,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { compressed = false, serdeProperties = Map()) } + validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) .getOrElse(EmptyStorageFormat) val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) @@ -825,11 +826,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * Create a [[CatalogStorageFormat]] for creating tables. + * + * Format: STORED AS ... */ override def visitCreateFileFormat( ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { (ctx.fileFormat, ctx.storageHandler) match { - // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format + // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format (SERDE serde) case (c: TableFileFormatContext, null) => visitTableFileFormat(c) // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO @@ -938,6 +941,49 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { EmptyStorageFormat.copy(serdeProperties = entries.toMap) } + /** + * Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT + * and STORED AS. + */ + private def validateRowFormatFileFormat( + rowFormatCtx: RowFormatContext, + createFileFormatCtx: CreateFileFormatContext, + parentCtx: ParserRuleContext): Unit = { + val cff = createFileFormatContextString(createFileFormatCtx) + (rowFormatCtx, createFileFormatCtx.fileFormat) match { + case (rf, null) => // only row format, no conflict + case (null, ff) => // only file format, no conflict + case (rfSerde: RowFormatSerdeContext, ffTable: TableFileFormatContext) => + if (visitTableFileFormat(ffTable).serde.isDefined) { + throw operationNotAllowed(s"ROW FORMAT SERDE is not compatible with $cff", parentCtx) + } + case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) => + ffGeneric.identifier.getText.toLowerCase match { + case ("sequencefile" | "textfile" | "rcfile") => // OK + case _ => throw operationNotAllowed( + s"ROW FORMAT SERDE is not compatible with $cff", parentCtx) + } + case (rfDelimited: RowFormatDelimitedContext, ffTable: TableFileFormatContext) => + throw operationNotAllowed(s"ROW FORMAT DELIMITED is not compatible with $cff", parentCtx) + case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) => + ffGeneric.identifier.getText.toLowerCase match { + case "textfile" => // OK + case _ => throw operationNotAllowed( + s"ROW FORMAT SERDE is not compatible with $cff", parentCtx) + } + case (rf, ff) => + // should never happen + throw operationNotAllowed(s"Unexpected combination of ROW FORMAT and $cff", parentCtx) + } + } + + /** + * Helper method to convert a [[CreateFileFormatContext]] to a human-readable form. + */ + private def createFileFormatContextString(ctx: CreateFileFormatContext): String = { + (0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ") + } + /** * Create or replace a view. This creates a [[CreateViewCommand]] command. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index fa8dabfe1a5d6..8277a7f095b87 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -212,6 +212,24 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed4, expected4) } + test("create table - conflicting row format and file format") { + assertUnsupported( + sql = "CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS parquet LOCATION '/path'", + containsThesePhrases = Seq("row format", "not compatible", "stored as parquet")) + val query = "CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS textfile" + parser.parsePlan(query) match { + case ct: CreateTable => + assert(ct.table.storage.serde == Some("anything")) + assert(ct.table.storage.inputFormat == + Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(ct.table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + test("create external table - location must be specified") { assertUnsupported( sql = "CREATE EXTERNAL TABLE my_tab", From 62c27f2f9a0dc516f3c4d39cad78c37532666764 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 11 May 2016 17:21:20 -0700 Subject: [PATCH 2/8] Add more tests --- .../spark/sql/execution/SparkSqlParser.scala | 6 +- .../execution/command/DDLCommandSuite.scala | 87 ++++++++++++++++--- 2 files changed, 77 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index b287df1bee85e..82f67f20b618c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -953,9 +953,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { (rowFormatCtx, createFileFormatCtx.fileFormat) match { case (rf, null) => // only row format, no conflict case (null, ff) => // only file format, no conflict - case (rfSerde: RowFormatSerdeContext, ffTable: TableFileFormatContext) => + case (_, ffTable: TableFileFormatContext) => if (visitTableFileFormat(ffTable).serde.isDefined) { - throw operationNotAllowed(s"ROW FORMAT SERDE is not compatible with $cff", parentCtx) + throw operationNotAllowed(s"ROW FORMAT is not compatible with $cff", parentCtx) } case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) => ffGeneric.identifier.getText.toLowerCase match { @@ -963,8 +963,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { case _ => throw operationNotAllowed( s"ROW FORMAT SERDE is not compatible with $cff", parentCtx) } - case (rfDelimited: RowFormatDelimitedContext, ffTable: TableFileFormatContext) => - throw operationNotAllowed(s"ROW FORMAT DELIMITED is not compatible with $cff", parentCtx) case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) => ffGeneric.identifier.getText.toLowerCase match { case "textfile" => // OK diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 8277a7f095b87..d30a44ff7cbda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.SparkSqlParser -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} // TODO: merge this with DDLSuite (SPARK-14441) @@ -212,21 +212,84 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed4, expected4) } - test("create table - conflicting row format and file format") { - assertUnsupported( - sql = "CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS parquet LOCATION '/path'", - containsThesePhrases = Seq("row format", "not compatible", "stored as parquet")) - val query = "CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS textfile" - parser.parsePlan(query) match { + test("create table - row format and table file format") { + val createTableStart = "CREATE TABLE my_tab ROW FORMAT" + val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'" + val fileFormatWithSerde = fileFormat + " SERDE 'myserde'" + val query1 = s"$createTableStart SERDE 'anything' $fileFormat" + val query2 = s"$createTableStart SERDE 'anything' $fileFormatWithSerde" + val query3 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat" + val query4 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormatWithSerde" + + parser.parsePlan(query1) match { case ct: CreateTable => assert(ct.table.storage.serde == Some("anything")) - assert(ct.table.storage.inputFormat == - Some("org.apache.hadoop.mapred.TextInputFormat")) - assert(ct.table.storage.outputFormat == - Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + assert(ct.table.storage.inputFormat == Some("inputfmt")) + assert(ct.table.storage.outputFormat == Some("outputfmt")) case other => fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") + s"got ${other.getClass.getName}: $query1") + } + + parser.parsePlan(query3) match { + case ct: CreateTable => + assert(ct.table.storage.serde.isEmpty) + assert(ct.table.storage.inputFormat == Some("inputfmt")) + assert(ct.table.storage.outputFormat == Some("outputfmt")) + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query1") + } + + assertUnsupported(query2, Seq("row format", "not compatible", "stored as", "myserde")) + assertUnsupported(query4, Seq("row format", "not compatible", "stored as", "myserde")) + } + + test("create table - row format serde and generic file format") { + val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") + val supportedSources = Set("sequencefile", "rcfile", "textfile") + + allSources.foreach { s => + val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s" + if (supportedSources.contains(s)) { + parser.parsePlan(query) match { + case ct: CreateTable => + val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) + assert(hiveSerde.isDefined) + assert(ct.table.storage.serde == Some("anything")) + assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } else { + assertUnsupported(query, Seq("row format", "not compatible", s"stored as $s")) + } + } + } + + test("create table - row format delimited and generic file format") { + val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") + val supportedSources = Set("textfile") + + allSources.foreach { s => + val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s" + if (supportedSources.contains(s)) { + parser.parsePlan(query) match { + case ct: CreateTable => + val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) + assert(hiveSerde.isDefined) + assert(ct.table.storage.serde == hiveSerde.get.serde) + assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } else { + assertUnsupported(query, Seq("row format", "not compatible", s"stored as $s")) + } } } From bf6b23d6d840d97ecc9376b7937cd3f17e3a68e8 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 11 May 2016 17:28:56 -0700 Subject: [PATCH 3/8] Clean up duplicate code a little --- .../execution/command/DDLCommandSuite.scala | 87 ++++++++----------- 1 file changed, 37 insertions(+), 50 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index d30a44ff7cbda..17631ca7b5912 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import scala.reflect.{classTag, ClassTag} + import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource} import org.apache.spark.sql.catalyst.catalog.FunctionResourceType @@ -39,6 +41,15 @@ class DDLCommandSuite extends PlanTest { containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) } } + private def parseAs[T: ClassTag](query: String): T = { + parser.parsePlan(query) match { + case t: T => t + case other => + fail(s"Expected to parse ${classTag[T].runtimeClass} from query," + + s"got ${other.getClass.getName}: $query") + } + } + test("create database") { val sql = """ @@ -221,26 +232,17 @@ class DDLCommandSuite extends PlanTest { val query3 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat" val query4 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormatWithSerde" - parser.parsePlan(query1) match { - case ct: CreateTable => - assert(ct.table.storage.serde == Some("anything")) - assert(ct.table.storage.inputFormat == Some("inputfmt")) - assert(ct.table.storage.outputFormat == Some("outputfmt")) - case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query1") - } - - parser.parsePlan(query3) match { - case ct: CreateTable => - assert(ct.table.storage.serde.isEmpty) - assert(ct.table.storage.inputFormat == Some("inputfmt")) - assert(ct.table.storage.outputFormat == Some("outputfmt")) - case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query1") - } - + // No conflicting serdes here, OK + val parsed1 = parseAs[CreateTable](query1) + assert(parsed1.table.storage.serde == Some("anything")) + assert(parsed1.table.storage.inputFormat == Some("inputfmt")) + assert(parsed1.table.storage.outputFormat == Some("outputfmt")) + val parsed3 = parseAs[CreateTable](query3) + assert(parsed3.table.storage.serde.isEmpty) + assert(parsed3.table.storage.inputFormat == Some("inputfmt")) + assert(parsed3.table.storage.outputFormat == Some("outputfmt")) + + // File format specified a SerDe, not OK assertUnsupported(query2, Seq("row format", "not compatible", "stored as", "myserde")) assertUnsupported(query4, Seq("row format", "not compatible", "stored as", "myserde")) } @@ -252,17 +254,12 @@ class DDLCommandSuite extends PlanTest { allSources.foreach { s => val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s" if (supportedSources.contains(s)) { - parser.parsePlan(query) match { - case ct: CreateTable => - val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) - assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == Some("anything")) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) - case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") - } + val ct = parseAs[CreateTable](query) + val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) + assert(hiveSerde.isDefined) + assert(ct.table.storage.serde == Some("anything")) + assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) } else { assertUnsupported(query, Seq("row format", "not compatible", s"stored as $s")) } @@ -276,17 +273,12 @@ class DDLCommandSuite extends PlanTest { allSources.foreach { s => val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s" if (supportedSources.contains(s)) { - parser.parsePlan(query) match { - case ct: CreateTable => - val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) - assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == hiveSerde.get.serde) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) - case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") - } + val ct = parseAs[CreateTable](query) + val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) + assert(hiveSerde.isDefined) + assert(ct.table.storage.serde == hiveSerde.get.serde) + assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) } else { assertUnsupported(query, Seq("row format", "not compatible", s"stored as $s")) } @@ -298,14 +290,9 @@ class DDLCommandSuite extends PlanTest { sql = "CREATE EXTERNAL TABLE my_tab", containsThesePhrases = Seq("create external table", "location")) val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'" - parser.parsePlan(query) match { - case ct: CreateTable => - assert(ct.table.tableType == CatalogTableType.EXTERNAL) - assert(ct.table.storage.locationUri == Some("/something/anything")) - case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") - } + val ct = parseAs[CreateTable](query) + assert(ct.table.tableType == CatalogTableType.EXTERNAL) + assert(ct.table.storage.locationUri == Some("/something/anything")) } // ALTER TABLE table_name RENAME TO new_table_name; From bc2a7bd5f7118f0ff7fa8aa6bf689896db056ef3 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 11 May 2016 17:34:18 -0700 Subject: [PATCH 4/8] Remove unnecessary method --- .../apache/spark/sql/execution/SparkSqlParser.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index af775a51dc337..aba8cf380f401 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -951,7 +951,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { rowFormatCtx: RowFormatContext, createFileFormatCtx: CreateFileFormatContext, parentCtx: ParserRuleContext): Unit = { - val cff = createFileFormatContextString(createFileFormatCtx) + val cff = (0 until createFileFormatCtx.getChildCount) + .map { i => createFileFormatCtx.getChild(i).getText } + .mkString(" ") (rowFormatCtx, createFileFormatCtx.fileFormat) match { case (rf, null) => // only row format, no conflict case (null, ff) => // only file format, no conflict @@ -977,13 +979,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } - /** - * Helper method to convert a [[CreateFileFormatContext]] to a human-readable form. - */ - private def createFileFormatContextString(ctx: CreateFileFormatContext): String = { - (0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ") - } - /** * Create or replace a view. This creates a [[CreateViewCommand]] command. * From 4ffaf59f48816caa85f7dcdf0a5660591e0829cf Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 11 May 2016 17:58:23 -0700 Subject: [PATCH 5/8] Fix NPE --- .../org/apache/spark/sql/execution/SparkSqlParser.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index aba8cf380f401..d32bd9125278f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -951,12 +951,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { rowFormatCtx: RowFormatContext, createFileFormatCtx: CreateFileFormatContext, parentCtx: ParserRuleContext): Unit = { + if (rowFormatCtx == null || createFileFormatCtx == null) { + return + } val cff = (0 until createFileFormatCtx.getChildCount) .map { i => createFileFormatCtx.getChild(i).getText } .mkString(" ") (rowFormatCtx, createFileFormatCtx.fileFormat) match { - case (rf, null) => // only row format, no conflict - case (null, ff) => // only file format, no conflict case (_, ffTable: TableFileFormatContext) => if (visitTableFileFormat(ffTable).serde.isDefined) { throw operationNotAllowed(s"ROW FORMAT is not compatible with $cff", parentCtx) From 7e02a0accda4a0be924a621c683ea8e1d0fa484b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 11 May 2016 18:55:42 -0700 Subject: [PATCH 6/8] Fix test --- .../scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 538e218f7e2df..d796156c2311d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -61,7 +61,7 @@ class HiveDDLCommandSuite extends PlanTest { |country STRING COMMENT 'country of origination') |COMMENT 'This is the staging page view table' |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day') - |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE + |STORED AS RCFILE |LOCATION '/user/external/page_view' |TBLPROPERTIES ('p1'='v1', 'p2'='v2') |AS SELECT * FROM src""".stripMargin @@ -88,8 +88,6 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.partitionColumns == CatalogColumn("dt", "string", comment = Some("date type")) :: CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) - assert(desc.storage.serdeProperties == - Map((serdeConstants.SERIALIZATION_FORMAT, "\u002C"), (serdeConstants.FIELD_DELIM, "\u002C"))) assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) assert(desc.storage.serde == From 87266e932d8aed951a24dcc613575bfc1be265f9 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 20 May 2016 14:09:46 -0700 Subject: [PATCH 7/8] Remove SERDE from STORED AS --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 ++-- .../spark/sql/execution/SparkSqlParser.scala | 21 +++++++------------ .../execution/command/DDLCommandSuite.scala | 20 ++++++------------ 3 files changed, 15 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 06ac37b7f83ed..779562a0dd9b0 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -266,8 +266,8 @@ createFileFormat ; fileFormat - : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat - | identifier #genericFileFormat + : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING #tableFileFormat + | identifier #genericFileFormat ; storageHandler diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index dc6234231500d..9f2019a458be6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -781,14 +781,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * * Expected format: * {{{ - * CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name - * [(col1 data_type [COMMENT col_comment], ...)] + * CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name + * [(col1[:] data_type [COMMENT col_comment], ...)] * [COMMENT table_comment] - * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)] - * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS] - * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]] + * [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)] * [ROW FORMAT row_format] - * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]] + * [STORED AS file_format] * [LOCATION path] * [TBLPROPERTIES (property_name=property_value, ...)] * [AS select_statement]; @@ -897,7 +895,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { override def visitCreateFileFormat( ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { (ctx.fileFormat, ctx.storageHandler) match { - // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format (SERDE serde) + // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format case (c: TableFileFormatContext, null) => visitTableFileFormat(c) // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO @@ -920,9 +918,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { EmptyStorageFormat.copy( inputFormat = Option(string(ctx.inFmt)), - outputFormat = Option(string(ctx.outFmt)), - serde = Option(ctx.serdeCls).map(string) - ) + outputFormat = Option(string(ctx.outFmt))) } /** @@ -1021,10 +1017,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .map { i => createFileFormatCtx.getChild(i).getText } .mkString(" ") (rowFormatCtx, createFileFormatCtx.fileFormat) match { - case (_, ffTable: TableFileFormatContext) => - if (visitTableFileFormat(ffTable).serde.isDefined) { - throw operationNotAllowed(s"ROW FORMAT is not compatible with $cff", parentCtx) - } + case (_, ffTable: TableFileFormatContext) => // OK case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) => ffGeneric.identifier.getText.toLowerCase match { case ("sequencefile" | "textfile" | "rcfile") => // OK diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index e138b7ad090d7..98e26b8ff8bb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -240,25 +240,18 @@ class DDLCommandSuite extends PlanTest { test("create table - row format and table file format") { val createTableStart = "CREATE TABLE my_tab ROW FORMAT" val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'" - val fileFormatWithSerde = fileFormat + " SERDE 'myserde'" val query1 = s"$createTableStart SERDE 'anything' $fileFormat" - val query2 = s"$createTableStart SERDE 'anything' $fileFormatWithSerde" - val query3 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat" - val query4 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormatWithSerde" + val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat" // No conflicting serdes here, OK val parsed1 = parseAs[CreateTableCommand](query1) assert(parsed1.table.storage.serde == Some("anything")) assert(parsed1.table.storage.inputFormat == Some("inputfmt")) assert(parsed1.table.storage.outputFormat == Some("outputfmt")) - val parsed3 = parseAs[CreateTableCommand](query3) - assert(parsed3.table.storage.serde.isEmpty) - assert(parsed3.table.storage.inputFormat == Some("inputfmt")) - assert(parsed3.table.storage.outputFormat == Some("outputfmt")) - - // File format specified a SerDe, not OK - assertUnsupported(query2, Seq("row format", "not compatible", "stored as", "myserde")) - assertUnsupported(query4, Seq("row format", "not compatible", "stored as", "myserde")) + val parsed2 = parseAs[CreateTableCommand](query2) + assert(parsed2.table.storage.serde.isEmpty) + assert(parsed2.table.storage.inputFormat == Some("inputfmt")) + assert(parsed2.table.storage.outputFormat == Some("outputfmt")) } test("create table - row format serde and generic file format") { @@ -615,8 +608,7 @@ class DDLCommandSuite extends PlanTest { test("alter table: set file format (not allowed)") { assertUnsupported( - "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " + - "OUTPUTFORMAT 'test' SERDE 'test'") + "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' OUTPUTFORMAT 'test'") assertUnsupported( "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " + "SET FILEFORMAT PARQUET") From 4589b6a3930f42ed9d3c52eb316d2ffd62766a89 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 20 May 2016 14:31:54 -0700 Subject: [PATCH 8/8] Address the rest of the comments --- .../spark/sql/execution/SparkSqlParser.scala | 27 ++++++++++++------- .../execution/command/DDLCommandSuite.scala | 4 +-- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 9f2019a458be6..fdf4718f30c86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1005,6 +1005,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT * and STORED AS. + * + * The following are allowed. Anything else is not: + * ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE] + * ROW FORMAT DELIMITED ... STORED AS TEXTFILE + * ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ... */ private def validateRowFormatFileFormat( rowFormatCtx: RowFormatContext, @@ -1013,26 +1018,30 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (rowFormatCtx == null || createFileFormatCtx == null) { return } - val cff = (0 until createFileFormatCtx.getChildCount) - .map { i => createFileFormatCtx.getChild(i).getText } - .mkString(" ") (rowFormatCtx, createFileFormatCtx.fileFormat) match { case (_, ffTable: TableFileFormatContext) => // OK case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) => ffGeneric.identifier.getText.toLowerCase match { case ("sequencefile" | "textfile" | "rcfile") => // OK - case _ => throw operationNotAllowed( - s"ROW FORMAT SERDE is not compatible with $cff", parentCtx) + case fmt => + throw operationNotAllowed( + s"ROW FORMAT SERDE is incompatible with format '$fmt', which also specifies a serde", + parentCtx) } case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) => ffGeneric.identifier.getText.toLowerCase match { case "textfile" => // OK - case _ => throw operationNotAllowed( - s"ROW FORMAT SERDE is not compatible with $cff", parentCtx) + case fmt => throw operationNotAllowed( + s"ROW FORMAT DELIMITED is only compatible with 'textfile', not '$fmt'", parentCtx) } - case (rf, ff) => + case _ => // should never happen - throw operationNotAllowed(s"Unexpected combination of ROW FORMAT and $cff", parentCtx) + def str(ctx: ParserRuleContext): String = { + (0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ") + } + throw operationNotAllowed( + s"Unexpected combination of ${str(rowFormatCtx)} and ${str(createFileFormatCtx)}", + parentCtx) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 98e26b8ff8bb3..6a659615e7d5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -268,7 +268,7 @@ class DDLCommandSuite extends PlanTest { assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) } else { - assertUnsupported(query, Seq("row format", "not compatible", s"stored as $s")) + assertUnsupported(query, Seq("row format serde", "incompatible", s)) } } } @@ -287,7 +287,7 @@ class DDLCommandSuite extends PlanTest { assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) } else { - assertUnsupported(query, Seq("row format", "not compatible", s"stored as $s")) + assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s)) } } }