Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15279][SQL] Catch conflicting SerDe when creating table #13068

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -266,8 +266,8 @@ createFileFormat
;

fileFormat
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat
| identifier #genericFileFormat
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING #tableFileFormat
| identifier #genericFileFormat
;

storageHandler
Expand Down
Expand Up @@ -781,14 +781,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*
* Expected format:
* {{{
* CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* [(col1 data_type [COMMENT col_comment], ...)]
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* [(col1[:] data_type [COMMENT col_comment], ...)]
* [COMMENT table_comment]
* [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
* [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
* [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
* [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
* [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* [AS select_statement];
Expand Down Expand Up @@ -834,6 +832,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
compressed = false,
serdeProperties = Map())
}
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(EmptyStorageFormat)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
Expand Down Expand Up @@ -890,6 +889,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {

/**
* Create a [[CatalogStorageFormat]] for creating tables.
*
* Format: STORED AS ...
*/
override def visitCreateFileFormat(
ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
Expand Down Expand Up @@ -917,9 +918,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
EmptyStorageFormat.copy(
inputFormat = Option(string(ctx.inFmt)),
outputFormat = Option(string(ctx.outFmt)),
serde = Option(ctx.serdeCls).map(string)
)
outputFormat = Option(string(ctx.outFmt)))
}

/**
Expand Down Expand Up @@ -1003,6 +1002,49 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
EmptyStorageFormat.copy(serdeProperties = entries.toMap)
}

/**
* Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT
* and STORED AS.
*
* The following are allowed. Anything else is not:
* ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE]
* ROW FORMAT DELIMITED ... STORED AS TEXTFILE
* ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ...
*/
private def validateRowFormatFileFormat(
rowFormatCtx: RowFormatContext,
createFileFormatCtx: CreateFileFormatContext,
parentCtx: ParserRuleContext): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Maybe it is good to add a few examples to this?)

if (rowFormatCtx == null || createFileFormatCtx == null) {
return
}
(rowFormatCtx, createFileFormatCtx.fileFormat) match {
case (_, ffTable: TableFileFormatContext) => // OK
case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) =>
ffGeneric.identifier.getText.toLowerCase match {
case ("sequencefile" | "textfile" | "rcfile") => // OK
case fmt =>
throw operationNotAllowed(
s"ROW FORMAT SERDE is incompatible with format '$fmt', which also specifies a serde",
parentCtx)
}
case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) =>
ffGeneric.identifier.getText.toLowerCase match {
case "textfile" => // OK
case fmt => throw operationNotAllowed(
s"ROW FORMAT DELIMITED is only compatible with 'textfile', not '$fmt'", parentCtx)
}
case _ =>
// should never happen
def str(ctx: ParserRuleContext): String = {
(0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ")
}
throw operationNotAllowed(
s"Unexpected combination of ${str(rowFormatCtx)} and ${str(createFileFormatCtx)}",
parentCtx)
}
}

/**
* Create or replace a view. This creates a [[CreateViewCommand]] command.
*
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.command

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
Expand All @@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}


// TODO: merge this with DDLSuite (SPARK-14441)
class DDLCommandSuite extends PlanTest {
private val parser = new SparkSqlParser(new SQLConf)
Expand All @@ -40,6 +43,15 @@ class DDLCommandSuite extends PlanTest {
containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) }
}

private def parseAs[T: ClassTag](query: String): T = {
parser.parsePlan(query) match {
case t: T => t
case other =>
fail(s"Expected to parse ${classTag[T].runtimeClass} from query," +
s"got ${other.getClass.getName}: $query")
}
}

test("create database") {
val sql =
"""
Expand Down Expand Up @@ -225,19 +237,69 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed4, expected4)
}

test("create table - row format and table file format") {
val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'"
val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat"

// No conflicting serdes here, OK
val parsed1 = parseAs[CreateTableCommand](query1)
assert(parsed1.table.storage.serde == Some("anything"))
assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
val parsed2 = parseAs[CreateTableCommand](query2)
assert(parsed2.table.storage.serde.isEmpty)
assert(parsed2.table.storage.inputFormat == Some("inputfmt"))
assert(parsed2.table.storage.outputFormat == Some("outputfmt"))
}

test("create table - row format serde and generic file format") {
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
val supportedSources = Set("sequencefile", "rcfile", "textfile")

allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
if (supportedSources.contains(s)) {
val ct = parseAs[CreateTableCommand](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
assert(ct.table.storage.serde == Some("anything"))
assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format serde", "incompatible", s))
}
}
}

test("create table - row format delimited and generic file format") {
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
val supportedSources = Set("textfile")

allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
if (supportedSources.contains(s)) {
val ct = parseAs[CreateTableCommand](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
assert(ct.table.storage.serde == hiveSerde.get.serde)
assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s))
}
}
}

test("create external table - location must be specified") {
assertUnsupported(
sql = "CREATE EXTERNAL TABLE my_tab",
containsThesePhrases = Seq("create external table", "location"))
val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
parser.parsePlan(query) match {
case ct: CreateTableCommand =>
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
}
val ct = parseAs[CreateTableCommand](query)
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
}

test("create table - property values must be set") {
Expand All @@ -252,14 +314,9 @@ class DDLCommandSuite extends PlanTest {

test("create table - location implies external") {
val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
parser.parsePlan(query) match {
case ct: CreateTableCommand =>
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
}
val ct = parseAs[CreateTableCommand](query)
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
}

test("create table using - with partitioned by") {
Expand Down Expand Up @@ -551,8 +608,7 @@ class DDLCommandSuite extends PlanTest {

test("alter table: set file format (not allowed)") {
assertUnsupported(
"ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
"OUTPUTFORMAT 'test' SERDE 'test'")
"ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' OUTPUTFORMAT 'test'")
assertUnsupported(
"ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
"SET FILEFORMAT PARQUET")
Expand Down
Expand Up @@ -61,7 +61,7 @@ class HiveDDLCommandSuite extends PlanTest {
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
|PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
|ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE
|STORED AS RCFILE
|LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2')
|AS SELECT * FROM src""".stripMargin
Expand All @@ -88,8 +88,6 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.partitionColumns ==
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
assert(desc.storage.serdeProperties ==
Map((serdeConstants.SERIALIZATION_FORMAT, "\u002C"), (serdeConstants.FIELD_DELIM, "\u002C")))
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde ==
Expand Down