Skip to content

Commit

Permalink
[SPARK-15279][SQL] Catch conflicting SerDe when creating table
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

The user may do something like:
```
CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS PARQUET
CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS ... SERDE 'myserde'
CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ORC
CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ... SERDE 'myserde'
```
None of these should be allowed because the SerDe's conflict. As of this patch:
- `ROW FORMAT DELIMITED` is only compatible with `TEXTFILE`
- `ROW FORMAT SERDE` is only compatible with `TEXTFILE`, `RCFILE` and `SEQUENCEFILE`

## How was this patch tested?

New tests in `DDLCommandSuite`.

Author: Andrew Or <andrew@databricks.com>

Closes #13068 from andrewor14/row-format-conflict.

(cherry picked from commit 2585d2b)
Signed-off-by: Andrew Or <andrew@databricks.com>
  • Loading branch information
Andrew Or committed May 23, 2016
1 parent 655d882 commit c55a39c
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 33 deletions.
Expand Up @@ -267,8 +267,8 @@ createFileFormat
;

fileFormat
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat
| identifier #genericFileFormat
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING #tableFileFormat
| identifier #genericFileFormat
;

storageHandler
Expand Down
Expand Up @@ -796,14 +796,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*
* Expected format:
* {{{
* CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* [(col1 data_type [COMMENT col_comment], ...)]
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* [(col1[:] data_type [COMMENT col_comment], ...)]
* [COMMENT table_comment]
* [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
* [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
* [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
* [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
* [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* [AS select_statement];
Expand Down Expand Up @@ -849,6 +847,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
compressed = false,
serdeProperties = Map())
}
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(EmptyStorageFormat)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
Expand Down Expand Up @@ -905,6 +904,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {

/**
* Create a [[CatalogStorageFormat]] for creating tables.
*
* Format: STORED AS ...
*/
override def visitCreateFileFormat(
ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
Expand Down Expand Up @@ -932,9 +933,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
EmptyStorageFormat.copy(
inputFormat = Option(string(ctx.inFmt)),
outputFormat = Option(string(ctx.outFmt)),
serde = Option(ctx.serdeCls).map(string)
)
outputFormat = Option(string(ctx.outFmt)))
}

/**
Expand Down Expand Up @@ -1018,6 +1017,49 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
EmptyStorageFormat.copy(serdeProperties = entries.toMap)
}

/**
* Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT
* and STORED AS.
*
* The following are allowed. Anything else is not:
* ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE]
* ROW FORMAT DELIMITED ... STORED AS TEXTFILE
* ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ...
*/
private def validateRowFormatFileFormat(
rowFormatCtx: RowFormatContext,
createFileFormatCtx: CreateFileFormatContext,
parentCtx: ParserRuleContext): Unit = {
if (rowFormatCtx == null || createFileFormatCtx == null) {
return
}
(rowFormatCtx, createFileFormatCtx.fileFormat) match {
case (_, ffTable: TableFileFormatContext) => // OK
case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) =>
ffGeneric.identifier.getText.toLowerCase match {
case ("sequencefile" | "textfile" | "rcfile") => // OK
case fmt =>
throw operationNotAllowed(
s"ROW FORMAT SERDE is incompatible with format '$fmt', which also specifies a serde",
parentCtx)
}
case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) =>
ffGeneric.identifier.getText.toLowerCase match {
case "textfile" => // OK
case fmt => throw operationNotAllowed(
s"ROW FORMAT DELIMITED is only compatible with 'textfile', not '$fmt'", parentCtx)
}
case _ =>
// should never happen
def str(ctx: ParserRuleContext): String = {
(0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ")
}
throw operationNotAllowed(
s"Unexpected combination of ${str(rowFormatCtx)} and ${str(createFileFormatCtx)}",
parentCtx)
}
}

/**
* Create or replace a view. This creates a [[CreateViewCommand]] command.
*
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.command

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
Expand All @@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}


// TODO: merge this with DDLSuite (SPARK-14441)
class DDLCommandSuite extends PlanTest {
private val parser = new SparkSqlParser(new SQLConf)
Expand All @@ -40,6 +43,15 @@ class DDLCommandSuite extends PlanTest {
containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) }
}

private def parseAs[T: ClassTag](query: String): T = {
parser.parsePlan(query) match {
case t: T => t
case other =>
fail(s"Expected to parse ${classTag[T].runtimeClass} from query," +
s"got ${other.getClass.getName}: $query")
}
}

test("create database") {
val sql =
"""
Expand Down Expand Up @@ -225,19 +237,69 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed4, expected4)
}

test("create table - row format and table file format") {
val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'"
val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat"

// No conflicting serdes here, OK
val parsed1 = parseAs[CreateTableCommand](query1)
assert(parsed1.table.storage.serde == Some("anything"))
assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
val parsed2 = parseAs[CreateTableCommand](query2)
assert(parsed2.table.storage.serde.isEmpty)
assert(parsed2.table.storage.inputFormat == Some("inputfmt"))
assert(parsed2.table.storage.outputFormat == Some("outputfmt"))
}

test("create table - row format serde and generic file format") {
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
val supportedSources = Set("sequencefile", "rcfile", "textfile")

allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
if (supportedSources.contains(s)) {
val ct = parseAs[CreateTableCommand](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
assert(ct.table.storage.serde == Some("anything"))
assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format serde", "incompatible", s))
}
}
}

test("create table - row format delimited and generic file format") {
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
val supportedSources = Set("textfile")

allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
if (supportedSources.contains(s)) {
val ct = parseAs[CreateTableCommand](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
assert(ct.table.storage.serde == hiveSerde.get.serde)
assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s))
}
}
}

test("create external table - location must be specified") {
assertUnsupported(
sql = "CREATE EXTERNAL TABLE my_tab",
containsThesePhrases = Seq("create external table", "location"))
val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
parser.parsePlan(query) match {
case ct: CreateTableCommand =>
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
}
val ct = parseAs[CreateTableCommand](query)
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
}

test("create table - property values must be set") {
Expand All @@ -252,14 +314,9 @@ class DDLCommandSuite extends PlanTest {

test("create table - location implies external") {
val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
parser.parsePlan(query) match {
case ct: CreateTableCommand =>
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
}
val ct = parseAs[CreateTableCommand](query)
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
}

test("create table using - with partitioned by") {
Expand Down Expand Up @@ -551,8 +608,7 @@ class DDLCommandSuite extends PlanTest {

test("alter table: set file format (not allowed)") {
assertUnsupported(
"ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
"OUTPUTFORMAT 'test' SERDE 'test'")
"ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' OUTPUTFORMAT 'test'")
assertUnsupported(
"ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
"SET FILEFORMAT PARQUET")
Expand Down
Expand Up @@ -61,7 +61,7 @@ class HiveDDLCommandSuite extends PlanTest {
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
|PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
|ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE
|STORED AS RCFILE
|LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2')
|AS SELECT * FROM src""".stripMargin
Expand All @@ -88,8 +88,6 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.partitionColumns ==
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
assert(desc.storage.serdeProperties ==
Map((serdeConstants.SERIALIZATION_FORMAT, "\u002C"), (serdeConstants.FIELD_DELIM, "\u002C")))
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde ==
Expand Down

0 comments on commit c55a39c

Please sign in to comment.