Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15279][SQL] Catch conflicting SerDe when creating table #13068

Closed
wants to merge 10 commits into from
Expand Up @@ -771,6 +771,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
compressed = false,
serdeProperties = Map())
}
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(EmptyStorageFormat)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
Expand Down Expand Up @@ -827,11 +828,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {

/**
* Create a [[CatalogStorageFormat]] for creating tables.
*
* Format: STORED AS ...
*/
override def visitCreateFileFormat(
ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
(ctx.fileFormat, ctx.storageHandler) match {
// Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
// Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format (SERDE serde)
case (c: TableFileFormatContext, null) =>
visitTableFileFormat(c)
// Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
Expand Down Expand Up @@ -940,6 +943,43 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
EmptyStorageFormat.copy(serdeProperties = entries.toMap)
}

/**
* Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT
* and STORED AS.
*/
private def validateRowFormatFileFormat(
rowFormatCtx: RowFormatContext,
createFileFormatCtx: CreateFileFormatContext,
parentCtx: ParserRuleContext): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Maybe it is good to add a few examples to this?)

if (rowFormatCtx == null || createFileFormatCtx == null) {
return
}
val cff = (0 until createFileFormatCtx.getChildCount)
.map { i => createFileFormatCtx.getChild(i).getText }
.mkString(" ")
(rowFormatCtx, createFileFormatCtx.fileFormat) match {
case (_, ffTable: TableFileFormatContext) =>
if (visitTableFileFormat(ffTable).serde.isDefined) {
throw operationNotAllowed(s"ROW FORMAT is not compatible with $cff", parentCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can be explicit that SERDE is not allowed for this case? btw, should we remove SERDE part from INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? in SqlBase.g4?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I like that actually! Hive doesn't support STORED AS serde too: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL. I actually have no idea why we accept it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we support this? What's the problem? Can you give me an example? thx

}
case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) =>
ffGeneric.identifier.getText.toLowerCase match {
case ("sequencefile" | "textfile" | "rcfile") => // OK
case _ => throw operationNotAllowed(
s"ROW FORMAT SERDE is not compatible with $cff", parentCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can say something like Format ${ffGeneric.identifier} does not a user-specific SerDe?

}
case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) =>
ffGeneric.identifier.getText.toLowerCase match {
case "textfile" => // OK
case _ => throw operationNotAllowed(
s"ROW FORMAT SERDE is not compatible with $cff", parentCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ROW FORMAT DELIMITED is only allowed to use with textfile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't that what we talked about?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry. I was thinking about what the error message should say.

Yea, the semantic is what we talked about.

}
case (rf, ff) =>
// should never happen
throw operationNotAllowed(s"Unexpected combination of ROW FORMAT and $cff", parentCtx)
Copy link
Contributor

@yhuai yhuai May 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also print out rf? Is ff the same as cff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cff is the nicely formatted string

}
}

/**
* Create or replace a view. This creates a [[CreateViewCommand]] command.
*
Expand Down
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.sql.execution.command

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}


// TODO: merge this with DDLSuite (SPARK-14441)
Expand All @@ -39,6 +41,15 @@ class DDLCommandSuite extends PlanTest {
containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) }
}

private def parseAs[T: ClassTag](query: String): T = {
parser.parsePlan(query) match {
case t: T => t
case other =>
fail(s"Expected to parse ${classTag[T].runtimeClass} from query," +
s"got ${other.getClass.getName}: $query")
}
}

test("create database") {
val sql =
"""
Expand Down Expand Up @@ -212,31 +223,83 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed4, expected4)
}

test("create table - row format and table file format") {
val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'"
val fileFormatWithSerde = fileFormat + " SERDE 'myserde'"
val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
val query2 = s"$createTableStart SERDE 'anything' $fileFormatWithSerde"
val query3 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat"
val query4 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormatWithSerde"

// No conflicting serdes here, OK
val parsed1 = parseAs[CreateTable](query1)
assert(parsed1.table.storage.serde == Some("anything"))
assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
val parsed3 = parseAs[CreateTable](query3)
assert(parsed3.table.storage.serde.isEmpty)
assert(parsed3.table.storage.inputFormat == Some("inputfmt"))
assert(parsed3.table.storage.outputFormat == Some("outputfmt"))

// File format specified a SerDe, not OK
assertUnsupported(query2, Seq("row format", "not compatible", "stored as", "myserde"))
assertUnsupported(query4, Seq("row format", "not compatible", "stored as", "myserde"))
}

test("create table - row format serde and generic file format") {
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
val supportedSources = Set("sequencefile", "rcfile", "textfile")

allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
if (supportedSources.contains(s)) {
val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
assert(ct.table.storage.serde == Some("anything"))
assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format", "not compatible", s"stored as $s"))
}
}
}

test("create table - row format delimited and generic file format") {
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
val supportedSources = Set("textfile")

allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
if (supportedSources.contains(s)) {
val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
assert(ct.table.storage.serde == hiveSerde.get.serde)
assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format", "not compatible", s"stored as $s"))
}
}
}

test("create external table - location must be specified") {
assertUnsupported(
sql = "CREATE EXTERNAL TABLE my_tab",
containsThesePhrases = Seq("create external table", "location"))
val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
parser.parsePlan(query) match {
case ct: CreateTable =>
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
case other =>
fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
}
val ct = parseAs[CreateTable](query)
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
}

test("create table - location implies external") {
val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
parser.parsePlan(query) match {
case ct: CreateTable =>
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
case other =>
fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
}
val ct = parseAs[CreateTable](query)
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
assert(ct.table.storage.locationUri == Some("/something/anything"))
}

// ALTER TABLE table_name RENAME TO new_table_name;
Expand Down
Expand Up @@ -61,7 +61,7 @@ class HiveDDLCommandSuite extends PlanTest {
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
|PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
|ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE
|STORED AS RCFILE
|LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2')
|AS SELECT * FROM src""".stripMargin
Expand All @@ -88,8 +88,6 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.partitionColumns ==
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
assert(desc.storage.serdeProperties ==
Map((serdeConstants.SERIALIZATION_FORMAT, "\u002C"), (serdeConstants.FIELD_DELIM, "\u002C")))
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde ==
Expand Down