Skip to content
Permalink
Browse files

[SPARK-29839][SQL] Supporting STORED AS in CREATE TABLE LIKE

### What changes were proposed in this pull request?
In SPARK-29421 (#26097) , we can specify a different table provider for `CREATE TABLE LIKE` via `USING provider`.
Hive support `STORED AS` new file format syntax:
```sql
CREATE TABLE tbl(a int) STORED AS TEXTFILE;
CREATE TABLE tbl2 LIKE tbl STORED AS PARQUET;
```
For Hive compatibility, we should also support `STORED AS` in `CREATE TABLE LIKE`.

### Why are the changes needed?
See #26097 (comment)

### Does this PR introduce any user-facing change?
Add a new syntax based on current CTL:
CREATE TABLE tbl2 LIKE tbl [STORED AS hiveFormat];

### How was this patch tested?
Add UTs.

Closes #26466 from LantaoJin/SPARK-29839.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
LantaoJin authored and cloud-fan committed Dec 2, 2019
1 parent 169415f commit 04a5b8f5f80ee746bdc16267e44a993a9941d335
@@ -878,6 +878,7 @@ Spark SQL supports the vast majority of Hive features, such as:
* All Hive DDL Functions, including:
* `CREATE TABLE`
* `CREATE TABLE AS SELECT`
* `CREATE TABLE LIKE`
* `ALTER TABLE`
* Most Hive Data types, including:
* `TINYINT`
@@ -121,7 +121,12 @@ statement
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier tableProvider? locationSpec? #createTableLike
LIKE source=tableIdentifier
(tableProvider |
rowFormat |
createFileFormat |
locationSpec |
(TBLPROPERTIES tableProps=tablePropertyList))* #createTableLike
| replaceTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitioning=transformList) |
@@ -540,15 +540,50 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* For example:
* {{{
* CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
* LIKE [other_db_name.]existing_table_name [USING provider] [locationSpec]
* LIKE [other_db_name.]existing_table_name
* [USING provider |
* [
* [ROW FORMAT row_format]
* [STORED AS file_format] [WITH SERDEPROPERTIES (...)]
* ]
* ]
* [locationSpec]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
val targetTable = visitTableIdentifier(ctx.target)
val sourceTable = visitTableIdentifier(ctx.source)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
CreateTableLikeCommand(targetTable, sourceTable, provider, location, ctx.EXISTS != null)
checkDuplicateClauses(ctx.tableProvider, "PROVIDER", ctx)
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
val provider = ctx.tableProvider.asScala.headOption.map(_.multipartIdentifier.getText)
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
// rowStorage used to determine CatalogStorageFormat.serde and
// CatalogStorageFormat.properties in STORED AS clause.
val rowStorage = ctx.rowFormat.asScala.headOption.map(visitRowFormat)
.getOrElse(CatalogStorageFormat.empty)
val fileFormat = ctx.createFileFormat.asScala.headOption.map(visitCreateFileFormat) match {
case Some(f) =>
if (provider.isDefined) {
throw new ParseException("'STORED AS hiveFormats' and 'USING provider' " +
"should not be specified both", ctx)
}
f.copy(
locationUri = location.map(CatalogUtils.stringToURI),
serde = rowStorage.serde.orElse(f.serde),
properties = rowStorage.properties ++ f.properties)
case None =>
if (rowStorage.serde.isDefined) {
throw new ParseException("'ROW FORMAT' must be used with 'STORED AS'", ctx)
}
CatalogStorageFormat.empty.copy(locationUri = location.map(CatalogUtils.stringToURI))
}
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
CreateTableLikeCommand(
targetTable, sourceTable, fileFormat, provider, properties, ctx.EXISTS != null)
}

/**
@@ -53,28 +53,37 @@ import org.apache.spark.sql.util.SchemaUtils
* are identical to the ones defined in the source table.
*
* The CatalogTable attributes copied from the source table are storage(inputFormat, outputFormat,
* serde, compressed, properties), schema, provider, partitionColumnNames, bucketSpec.
* serde, compressed, properties), schema, provider, partitionColumnNames, bucketSpec by default.
*
* Use "CREATE TABLE t1 LIKE t2 USING file_format"
* to specify new file format for t1 from a data source table t2.
* Use "CREATE TABLE t1 LIKE t2 USING file_format" to specify new provider for t1.
* For Hive compatibility, use "CREATE TABLE t1 LIKE t2 STORED AS hiveFormat"
* to specify new file storage format (inputFormat, outputFormat, serde) for t1.
*
* The syntax of using this command in SQL is:
* {{{
* CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
* LIKE [other_db_name.]existing_table_name [USING provider] [locationSpec]
* LIKE [other_db_name.]existing_table_name
* [USING provider |
* [
* [ROW FORMAT row_format]
* [STORED AS file_format] [WITH SERDEPROPERTIES (...)]
* ]
* ]
* [locationSpec]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
case class CreateTableLikeCommand(
targetTable: TableIdentifier,
sourceTable: TableIdentifier,
fileFormat: CatalogStorageFormat,
provider: Option[String],
location: Option[String],
properties: Map[String, String] = Map.empty,
ifNotExists: Boolean) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)

val newProvider = if (provider.isDefined) {
if (!DDLUtils.isHiveTable(provider)) {
// check the validation of provider input, invalid provider will throw
@@ -84,24 +93,36 @@ case class CreateTableLikeCommand(
provider
} else if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
Some(sparkSession.sessionState.conf.defaultDataSourceName)
} else if (fileFormat.inputFormat.isDefined) {
Some(DDLUtils.HIVE_PROVIDER)
} else {
sourceTableDesc.provider
}

val newStorage = if (fileFormat.inputFormat.isDefined) {
fileFormat
} else {
sourceTableDesc.storage.copy(locationUri = fileFormat.locationUri)
}

// If the location is specified, we create an external table internally.
// Otherwise create a managed table.
val tblType = if (location.isEmpty) CatalogTableType.MANAGED else CatalogTableType.EXTERNAL
val tblType = if (newStorage.locationUri.isEmpty) {
CatalogTableType.MANAGED
} else {
CatalogTableType.EXTERNAL
}

val newTableDesc =
CatalogTable(
identifier = targetTable,
tableType = tblType,
storage = sourceTableDesc.storage.copy(
locationUri = location.map(CatalogUtils.stringToURI(_))),
storage = newStorage,
schema = sourceTableDesc.schema,
provider = newProvider,
partitionColumnNames = sourceTableDesc.partitionColumnNames,
bucketSpec = sourceTableDesc.bucketSpec)
bucketSpec = sourceTableDesc.bucketSpec,
properties = properties)

catalog.createTable(newTableDesc, ifNotExists)
Seq.empty[Row]
@@ -974,75 +974,81 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {

test("create table like") {
val v1 = "CREATE TABLE table1 LIKE table2"
val (target, source, provider, location, exists) = parser.parsePlan(v1).collect {
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
}.head
val (target, source, fileFormat, provider, properties, exists) =
parser.parsePlan(v1).collect {
case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
}.head
assert(exists == false)
assert(target.database.isEmpty)
assert(target.table == "table1")
assert(source.database.isEmpty)
assert(source.table == "table2")
assert(location.isEmpty)
assert(fileFormat.locationUri.isEmpty)
assert(provider.isEmpty)

val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2"
val (target2, source2, provider2, location2, exists2) = parser.parsePlan(v2).collect {
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
}.head
val (target2, source2, fileFormat2, provider2, properties2, exists2) =
parser.parsePlan(v2).collect {
case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
}.head
assert(exists2)
assert(target2.database.isEmpty)
assert(target2.table == "table1")
assert(source2.database.isEmpty)
assert(source2.table == "table2")
assert(location2.isEmpty)
assert(fileFormat2.locationUri.isEmpty)
assert(provider2.isEmpty)

val v3 = "CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'"
val (target3, source3, provider3, location3, exists3) = parser.parsePlan(v3).collect {
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
}.head
val (target3, source3, fileFormat3, provider3, properties3, exists3) =
parser.parsePlan(v3).collect {
case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
}.head
assert(!exists3)
assert(target3.database.isEmpty)
assert(target3.table == "table1")
assert(source3.database.isEmpty)
assert(source3.table == "table2")
assert(location3 == Some("/spark/warehouse"))
assert(fileFormat3.locationUri.map(_.toString) == Some("/spark/warehouse"))
assert(provider3.isEmpty)

val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'"
val (target4, source4, provider4, location4, exists4) = parser.parsePlan(v4).collect {
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
}.head
val (target4, source4, fileFormat4, provider4, properties4, exists4) =
parser.parsePlan(v4).collect {
case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
}.head
assert(exists4)
assert(target4.database.isEmpty)
assert(target4.table == "table1")
assert(source4.database.isEmpty)
assert(source4.table == "table2")
assert(location4 == Some("/spark/warehouse"))
assert(fileFormat4.locationUri.map(_.toString) == Some("/spark/warehouse"))
assert(provider4.isEmpty)

val v5 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING parquet"
val (target5, source5, provider5, location5, exists5) = parser.parsePlan(v5).collect {
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
}.head
val (target5, source5, fileFormat5, provider5, properties5, exists5) =
parser.parsePlan(v5).collect {
case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
}.head
assert(exists5)
assert(target5.database.isEmpty)
assert(target5.table == "table1")
assert(source5.database.isEmpty)
assert(source5.table == "table2")
assert(location5.isEmpty)
assert(fileFormat5.locationUri.isEmpty)
assert(provider5 == Some("parquet"))

val v6 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING ORC"
val (target6, source6, provider6, location6, exists6) = parser.parsePlan(v6).collect {
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
}.head
val (target6, source6, fileFormat6, provider6, properties6, exists6) =
parser.parsePlan(v6).collect {
case CreateTableLikeCommand(t, s, f, p, pr, e) => (t, s, f, p, pr, e)
}.head
assert(exists6)
assert(target6.database.isEmpty)
assert(target6.table == "table1")
assert(source6.database.isEmpty)
assert(source6.table == "table2")
assert(location6.isEmpty)
assert(fileFormat6.locationUri.isEmpty)
assert(provider6 == Some("ORC"))
}
}
@@ -2564,4 +2564,104 @@ class HiveDDLSuite
}
}
}

test("Create Table LIKE STORED AS Hive Format") {
val catalog = spark.sessionState.catalog
withTable("s") {
sql("CREATE TABLE s(a INT, b INT) STORED AS ORC")
hiveFormats.foreach { tableType =>
val expectedSerde = HiveSerDe.sourceToSerDe(tableType)
withTable("t") {
sql(s"CREATE TABLE t LIKE s STORED AS $tableType")
val table = catalog.getTableMetadata(TableIdentifier("t"))
assert(table.provider == Some("hive"))
assert(table.storage.serde == expectedSerde.get.serde)
assert(table.storage.inputFormat == expectedSerde.get.inputFormat)
assert(table.storage.outputFormat == expectedSerde.get.outputFormat)
}
}
}
}

test("Create Table LIKE with specified TBLPROPERTIES") {
val catalog = spark.sessionState.catalog
withTable("s", "t") {
sql("CREATE TABLE s(a INT, b INT) USING hive TBLPROPERTIES('a'='apple')")
val source = catalog.getTableMetadata(TableIdentifier("s"))
assert(source.properties("a") == "apple")
sql("CREATE TABLE t LIKE s STORED AS parquet TBLPROPERTIES('f'='foo', 'b'='bar')")
val table = catalog.getTableMetadata(TableIdentifier("t"))
assert(table.properties.get("a") === None)
assert(table.properties("f") == "foo")
assert(table.properties("b") == "bar")
}
}

test("Create Table LIKE with row format") {
val catalog = spark.sessionState.catalog
withTable("sourceHiveTable", "sourceDsTable", "targetHiveTable1", "targetHiveTable2") {
sql("CREATE TABLE sourceHiveTable(a INT, b INT) STORED AS PARQUET")
sql("CREATE TABLE sourceDsTable(a INT, b INT) USING PARQUET")

// row format doesn't work in create targetDsTable
var e = intercept[AnalysisException] {
spark.sql(
"""
|CREATE TABLE targetDsTable LIKE sourceHiveTable USING PARQUET
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
""".stripMargin)
}.getMessage
assert(e.contains("'ROW FORMAT' must be used with 'STORED AS'"))

// row format doesn't work with provider hive
e = intercept[AnalysisException] {
spark.sql(
"""
|CREATE TABLE targetHiveTable LIKE sourceHiveTable USING hive
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|WITH SERDEPROPERTIES ('test' = 'test')
""".stripMargin)
}.getMessage
assert(e.contains("'ROW FORMAT' must be used with 'STORED AS'"))

// row format doesn't work without 'STORED AS'
e = intercept[AnalysisException] {
spark.sql(
"""
|CREATE TABLE targetDsTable LIKE sourceDsTable
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|WITH SERDEPROPERTIES ('test' = 'test')
""".stripMargin)
}.getMessage
assert(e.contains("'ROW FORMAT' must be used with 'STORED AS'"))

// row format works with STORED AS hive format (from hive table)
spark.sql(
"""
|CREATE TABLE targetHiveTable1 LIKE sourceHiveTable STORED AS PARQUET
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|WITH SERDEPROPERTIES ('test' = 'test')
""".stripMargin)
var table = catalog.getTableMetadata(TableIdentifier("targetHiveTable1"))
assert(table.provider === Some("hive"))
assert(table.storage.inputFormat ===
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(table.storage.serde === Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(table.storage.properties("test") == "test")

// row format works with STORED AS hive format (from datasource table)
spark.sql(
"""
|CREATE TABLE targetHiveTable2 LIKE sourceDsTable STORED AS PARQUET
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|WITH SERDEPROPERTIES ('test' = 'test')
""".stripMargin)
table = catalog.getTableMetadata(TableIdentifier("targetHiveTable2"))
assert(table.provider === Some("hive"))
assert(table.storage.inputFormat ===
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(table.storage.serde === Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(table.storage.properties("test") == "test")
}
}
}

0 comments on commit 04a5b8f

Please sign in to comment.
You can’t perform that action at this time.