diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 0e2cd39448bba..a937ad1eb78be 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -272,8 +272,7 @@ createFileFormat ; fileFormat - : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? - (INPUTDRIVER inDriver=STRING OUTPUTDRIVER outDriver=STRING)? #tableFileFormat + : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat | identifier #genericFileFormat ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 4ef59316ceb27..ad989a97e4afa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -220,14 +220,30 @@ case class CatalogTable( tableType: CatalogTableType, storage: CatalogStorageFormat, schema: Seq[CatalogColumn], - partitionColumns: Seq[CatalogColumn] = Seq.empty, - sortColumns: Seq[CatalogColumn] = Seq.empty, - numBuckets: Int = 0, + partitionColumnNames: Seq[String] = Seq.empty, + sortColumnNames: Seq[String] = Seq.empty, + bucketColumnNames: Seq[String] = Seq.empty, + numBuckets: Int = -1, createTime: Long = System.currentTimeMillis, - lastAccessTime: Long = System.currentTimeMillis, + lastAccessTime: Long = -1, properties: Map[String, String] = Map.empty, viewOriginalText: Option[String] = None, - viewText: Option[String] = None) { + viewText: Option[String] = None, + comment: Option[String] = None) { + + // Verify that the provided columns are part of the schema + private val colNames = schema.map(_.name).toSet + private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = { + require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " + + s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") + } + requireSubsetOfSchema(partitionColumnNames, "partition") + requireSubsetOfSchema(sortColumnNames, "sort") + requireSubsetOfSchema(bucketColumnNames, "bucket") + + /** Columns this table is partitioned by. */ + def partitionColumns: Seq[CatalogColumn] = + schema.filter { c => partitionColumnNames.contains(c.name) } /** Return the database this table was specified to belong to, assuming it exists. */ def database: String = identifier.database.getOrElse { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index 0d9b0851fa7be..f961fe3292be3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -553,8 +553,12 @@ abstract class CatalogTestUtils { identifier = TableIdentifier(name, database), tableType = CatalogTableType.EXTERNAL_TABLE, storage = storageFormat, - schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")), - partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))) + schema = Seq( + CatalogColumn("col1", "int"), + CatalogColumn("col2", "string"), + CatalogColumn("a", "int"), + CatalogColumn("b", "string")), + partitionColumnNames = Seq("a", "b")) } def newFunc(name: String, database: Option[String] = None): CatalogFunction = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 73d9640c35f49..af92cecee5137 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -179,7 +179,9 @@ class SparkSqlAstBuilder extends AstBuilder { } } - /** Type to keep track of a table header. */ + /** + * Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal). + */ type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean) /** @@ -616,10 +618,7 @@ class SparkSqlAstBuilder extends AstBuilder { case s: GenericFileFormatContext => (Seq.empty[String], Option(s.identifier.getText)) case s: TableFileFormatContext => - val elements = Seq(s.inFmt, s.outFmt) ++ - Option(s.serdeCls).toSeq ++ - Option(s.inDriver).toSeq ++ - Option(s.outDriver).toSeq + val elements = Seq(s.inFmt, s.outFmt) ++ Option(s.serdeCls).toSeq (elements.map(string), None) } AlterTableSetFileFormat( @@ -773,22 +772,6 @@ class SparkSqlAstBuilder extends AstBuilder { .map(_.identifier.getText)) } - /** - * Create a skew specification. This contains three components: - * - The Skewed Columns - * - Values for which are skewed. The size of each entry must match the number of skewed columns. - * - A store in directory flag. - */ - override def visitSkewSpec( - ctx: SkewSpecContext): (Seq[String], Seq[Seq[String]], Boolean) = withOrigin(ctx) { - val skewedValues = if (ctx.constantList != null) { - Seq(visitConstantList(ctx.constantList)) - } else { - visitNestedConstantList(ctx.nestedConstantList) - } - (visitIdentifierList(ctx.identifierList), skewedValues, ctx.DIRECTORIES != null) - } - /** * Convert a nested constants list into a sequence of string sequences. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 5137bd11d83ab..234099ad157f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -224,29 +224,6 @@ case class DropTable( } } -/** - * A command that renames a table/view. - * - * The syntax of this command is: - * {{{ - * ALTER TABLE table1 RENAME TO table2; - * ALTER VIEW view1 RENAME TO view2; - * }}} - */ -case class AlterTableRename( - oldName: TableIdentifier, - newName: TableIdentifier) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - val catalog = sqlContext.sessionState.catalog - catalog.invalidateTable(oldName) - catalog.renameTable(oldName, newName) - Seq.empty[Row] - } - -} - /** * A command that sets table/view properties. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala new file mode 100644 index 0000000000000..9c6030502dc22 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable + + +// TODO: move the rest of the table commands from ddl.scala to this file + +/** + * A command to create a table. + * + * Note: This is currently used only for creating Hive tables. + * This is not intended for temporary tables. + * + * The syntax of using this command in SQL is: + * {{{ + * CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name + * [(col1 data_type [COMMENT col_comment], ...)] + * [COMMENT table_comment] + * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)] + * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS] + * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) + * [STORED AS DIRECTORIES] + * [ROW FORMAT row_format] + * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]] + * [LOCATION path] + * [TBLPROPERTIES (property_name=property_value, ...)] + * [AS select_statement]; + * }}} + */ +case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sessionState.catalog.createTable(table, ifNotExists) + Seq.empty[Row] + } + +} + + +/** + * A command that renames a table/view. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table1 RENAME TO table2; + * ALTER VIEW view1 RENAME TO view2; + * }}} + */ +case class AlterTableRename( + oldName: TableIdentifier, + newName: TableIdentifier) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + catalog.invalidateTable(oldName) + catalog.renameTable(oldName, newName) + Seq.empty[Row] + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 1c8dd6828673e..6e6475ee297c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -440,37 +440,25 @@ class DDLCommandSuite extends PlanTest { } test("alter table: set file format") { - val sql1 = - """ - |ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' - |OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test' - """.stripMargin - val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " + + val sql1 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " + "OUTPUTFORMAT 'test' SERDE 'test'" - val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " + + val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " + "SET FILEFORMAT PARQUET" val parsed1 = parser.parsePlan(sql1) val parsed2 = parser.parsePlan(sql2) - val parsed3 = parser.parsePlan(sql3) val tableIdent = TableIdentifier("table_name", None) val expected1 = AlterTableSetFileFormat( tableIdent, None, - List("test", "test", "test", "test", "test"), + List("test", "test", "test"), None)(sql1) val expected2 = AlterTableSetFileFormat( - tableIdent, - None, - List("test", "test", "test"), - None)(sql2) - val expected3 = AlterTableSetFileFormat( tableIdent, Some(Map("dt" -> "2008-08-08", "country" -> "us")), Seq(), - Some("PARQUET"))(sql3) + Some("PARQUET"))(sql2) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) - comparePlans(parsed3, expected3) } test("alter table: set location") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 40a8b0e614b2e..9ffffa0bdd6e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -380,8 +380,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext - test("show tables") { withTempTable("show1a", "show2b") { sql( diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index bfc3d195ff2ab..eb49eabcb1ba9 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -162,7 +162,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { runCliWithin(3.minute)( "CREATE TABLE hive_test(key INT, val STRING);" - -> "OK", + -> "", "SHOW TABLES;" -> "hive_test", s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;" @@ -187,7 +187,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "USE hive_test_db;" -> "", "CREATE TABLE hive_test(key INT, val STRING);" - -> "OK", + -> "", "SHOW TABLES;" -> "hive_test" ) @@ -210,9 +210,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { """CREATE TABLE t1(key string, val string) |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; """.stripMargin - -> "OK", + -> "", "CREATE TABLE sourceTable (key INT, val STRING);" - -> "OK", + -> "", s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;" -> "OK", "INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;" diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index f0eeda09dba5a..a45d180464602 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -366,10 +366,76 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "sort_merge_join_desc_6", "sort_merge_join_desc_7", + // These tests try to create a table with bucketed columns, which we don't support + "auto_join32", + "auto_join_filters", + "auto_smb_mapjoin_14", + "ct_case_insensitive", + "explain_rearrange", + "groupby_sort_10", + "groupby_sort_2", + "groupby_sort_3", + "groupby_sort_4", + "groupby_sort_5", + "groupby_sort_7", + "groupby_sort_8", + "groupby_sort_9", + "groupby_sort_test_1", + "inputddl4", + "join_filters", + "join_nulls", + "join_nullsafe", + "load_dyn_part2", + "orc_empty_files", + "reduce_deduplicate", + "smb_mapjoin9", + "smb_mapjoin_1", + "smb_mapjoin_10", + "smb_mapjoin_13", + "smb_mapjoin_14", + "smb_mapjoin_15", + "smb_mapjoin_16", + "smb_mapjoin_17", + "smb_mapjoin_2", + "smb_mapjoin_21", + "smb_mapjoin_25", + "smb_mapjoin_3", + "smb_mapjoin_4", + "smb_mapjoin_5", + "smb_mapjoin_6", + "smb_mapjoin_7", + "smb_mapjoin_8", + "sort_merge_join_desc_1", + "sort_merge_join_desc_2", + "sort_merge_join_desc_3", + "sort_merge_join_desc_4", + + // These tests try to create a table with skewed columns, which we don't support + "create_skewed_table1", + "skewjoinopt13", + "skewjoinopt18", + "skewjoinopt9", + // Index commands are not supported "drop_index", "drop_index_removes_partition_dirs", "alter_index", + "auto_sortmerge_join_1", + "auto_sortmerge_join_10", + "auto_sortmerge_join_11", + "auto_sortmerge_join_12", + "auto_sortmerge_join_13", + "auto_sortmerge_join_14", + "auto_sortmerge_join_15", + "auto_sortmerge_join_16", + "auto_sortmerge_join_2", + "auto_sortmerge_join_3", + "auto_sortmerge_join_4", + "auto_sortmerge_join_5", + "auto_sortmerge_join_6", + "auto_sortmerge_join_7", + "auto_sortmerge_join_8", + "auto_sortmerge_join_9", // Macro commands are not supported "macro", @@ -435,33 +501,14 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "auto_join3", "auto_join30", "auto_join31", - "auto_join32", "auto_join4", "auto_join5", "auto_join6", "auto_join7", "auto_join8", "auto_join9", - "auto_join_filters", "auto_join_nulls", "auto_join_reordering_values", - "auto_smb_mapjoin_14", - "auto_sortmerge_join_1", - "auto_sortmerge_join_10", - "auto_sortmerge_join_11", - "auto_sortmerge_join_12", - "auto_sortmerge_join_13", - "auto_sortmerge_join_14", - "auto_sortmerge_join_15", - "auto_sortmerge_join_16", - "auto_sortmerge_join_2", - "auto_sortmerge_join_3", - "auto_sortmerge_join_4", - "auto_sortmerge_join_5", - "auto_sortmerge_join_6", - "auto_sortmerge_join_7", - "auto_sortmerge_join_8", - "auto_sortmerge_join_9", "binary_constant", "binarysortable_1", "cast1", @@ -492,13 +539,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "create_insert_outputformat", "create_like_tbl_props", "create_nested_type", - "create_skewed_table1", "create_struct_table", "create_view_translate", "cross_join", "cross_product_check_1", "cross_product_check_2", - "ct_case_insensitive", "database_drop", "database_location", "database_properties", @@ -534,7 +579,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "escape_distributeby1", "escape_orderby1", "escape_sortby1", - "explain_rearrange", "fileformat_mix", "fileformat_sequencefile", "fileformat_text", @@ -589,16 +633,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "groupby_neg_float", "groupby_ppd", "groupby_ppr", - "groupby_sort_10", - "groupby_sort_2", - "groupby_sort_3", - "groupby_sort_4", - "groupby_sort_5", "groupby_sort_6", - "groupby_sort_7", - "groupby_sort_8", - "groupby_sort_9", - "groupby_sort_test_1", "having", "implicit_cast1", "index_serde", @@ -653,7 +688,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "inputddl1", "inputddl2", "inputddl3", - "inputddl4", "inputddl6", "inputddl7", "inputddl8", @@ -709,11 +743,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "join_array", "join_casesensitive", "join_empty", - "join_filters", "join_hive_626", "join_map_ppr", - "join_nulls", - "join_nullsafe", "join_rc", "join_reorder2", "join_reorder3", @@ -737,7 +768,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "load_dyn_part13", "load_dyn_part14", "load_dyn_part14_win", - "load_dyn_part2", "load_dyn_part3", "load_dyn_part4", "load_dyn_part5", @@ -790,7 +820,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "nullscript", "optional_outer", "orc_dictionary_threshold", - "orc_empty_files", "order", "order2", "outer_join_ppr", @@ -846,7 +875,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "rcfile_null_value", "rcfile_toleratecorruptions", "rcfile_union", - "reduce_deduplicate", "reduce_deduplicate_exclude_gby", "reduce_deduplicate_exclude_join", "reduce_deduplicate_extended", @@ -867,31 +895,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "show_functions", "show_partitions", "show_tblproperties", - "skewjoinopt13", - "skewjoinopt18", - "skewjoinopt9", - "smb_mapjoin9", - "smb_mapjoin_1", - "smb_mapjoin_10", - "smb_mapjoin_13", - "smb_mapjoin_14", - "smb_mapjoin_15", - "smb_mapjoin_16", - "smb_mapjoin_17", - "smb_mapjoin_2", - "smb_mapjoin_21", - "smb_mapjoin_25", - "smb_mapjoin_3", - "smb_mapjoin_4", - "smb_mapjoin_5", - "smb_mapjoin_6", - "smb_mapjoin_7", - "smb_mapjoin_8", "sort", - "sort_merge_join_desc_1", - "sort_merge_join_desc_2", - "sort_merge_join_desc_3", - "sort_merge_join_desc_4", "stats0", "stats_aggregator_error_1", "stats_empty_partition", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 14f331961ef4a..ccc8345d7375d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -91,7 +91,7 @@ private[hive] object HiveSerDe { "textfile" -> HiveSerDe( inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")), + outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), "avro" -> HiveSerDe( @@ -905,8 +905,13 @@ private[hive] case class MetastoreRelation( val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tTable.setSd(sd) - sd.setCols(table.schema.map(toHiveColumn).asJava) - tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava) + + // Note: In Hive the schema and partition columns must be disjoint sets + val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => + table.partitionColumnNames.contains(c.getName) + } + sd.setCols(schema.asJava) + tTable.setPartitionKeys(partCols.asJava) table.storage.locationUri.foreach(sd.setLocation) table.storage.inputFormat.foreach(sd.setInputFormat) @@ -1013,7 +1018,10 @@ private[hive] case class MetastoreRelation( val partitionKeys = table.partitionColumns.map(_.toAttribute) /** Non-partitionKey attributes */ - val attributes = table.schema.map(_.toAttribute) + // TODO: just make this hold the schema itself, not just non-partition columns + val attributes = table.schema + .filter { c => !table.partitionColumnNames.contains(c.name) } + .map(_.toAttribute) val output = attributes ++ partitionKeys diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 39e26acd7fe9b..82a9dceb7fd9a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -299,6 +299,10 @@ private[hive] class HiveClientImpl( tableName: String): Option[CatalogTable] = withHiveState { logDebug(s"Looking up $dbName.$tableName") Option(client.getTable(dbName, tableName, false)).map { h => + // Note: Hive separates partition columns and the schema, but for us the + // partition columns are part of the schema + val partCols = h.getPartCols.asScala.map(fromHiveColumn) + val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { @@ -307,9 +311,10 @@ private[hive] class HiveClientImpl( case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW }, - schema = h.getCols.asScala.map(fromHiveColumn), - partitionColumns = h.getPartCols.asScala.map(fromHiveColumn), - sortColumns = Seq(), + schema = schema, + partitionColumnNames = partCols.map(_.name), + sortColumnNames = Seq(), // TODO: populate this + bucketColumnNames = h.getBucketCols.asScala, numBuckets = h.getNumBuckets, createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, @@ -675,24 +680,37 @@ private[hive] class HiveClientImpl( private def toHiveTable(table: CatalogTable): HiveTable = { val hiveTable = new HiveTable(table.database, table.identifier.table) - // For EXTERNAL_TABLE/MANAGED_TABLE, we also need to set EXTERNAL field in - // the table properties accodringly. Otherwise, if EXTERNAL_TABLE is the table type - // but EXTERNAL field is not set, Hive metastore will change the type to - // MANAGED_TABLE (see - // metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105) + // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. + // Otherwise, Hive metastore will change the table to a MANAGED_TABLE. + // (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105) hiveTable.setTableType(table.tableType match { case CatalogTableType.EXTERNAL_TABLE => hiveTable.setProperty("EXTERNAL", "TRUE") HiveTableType.EXTERNAL_TABLE case CatalogTableType.MANAGED_TABLE => - hiveTable.setProperty("EXTERNAL", "FALSE") HiveTableType.MANAGED_TABLE case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW }) - hiveTable.setFields(table.schema.map(toHiveColumn).asJava) - hiveTable.setPartCols(table.partitionColumns.map(toHiveColumn).asJava) + // Note: In Hive the schema and partition columns must be disjoint sets + val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => + table.partitionColumnNames.contains(c.getName) + } + if (table.schema.isEmpty) { + // This is a hack to preserve an existing behavior. For data source table, + // we allow users to not provide a schema. Before Spark 2.0, + // there is one field called "col" got automatically populated. + // At here, we add this col to here explicitly when no schema is defined + // because we always set the LazySimpleSerde as the default SerDe and + // Hive stops to automatically populate this field. + hiveTable.setFields( + Seq(new FieldSchema("col", "array", "from deserializer")).asJava) + } else { + hiveTable.setFields(schema.asJava) + } + hiveTable.setPartCols(partCols.asJava) // TODO: set sort columns here too + hiveTable.setBucketCols(table.bucketColumnNames.asJava) hiveTable.setOwner(conf.getUser) hiveTable.setNumBuckets(table.numBuckets) hiveTable.setCreateTime((table.createTime / 1000).toInt) @@ -700,9 +718,11 @@ private[hive] class HiveClientImpl( table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) - table.storage.serde.foreach(hiveTable.setSerializationLib) + hiveTable.setSerializationLib( + table.storage.serde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) } + table.comment.foreach { c => hiveTable.setProperty("comment", c) } table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) } table.viewText.foreach { t => hiveTable.setViewExpandedText(t) } hiveTable diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 7a435117e7a82..b14db7fe71619 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ @@ -33,8 +34,9 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.execution.command.CreateTable import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect => CreateView} -import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveSerDe} +import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveMetastoreTypes, HiveSerDe} import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper /** @@ -121,84 +123,116 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { } /** - * Create a [[CatalogStorageFormat]]. This is part of the [[CreateTableAsSelect]] command. + * Create a [[CatalogStorageFormat]] for creating tables. */ override def visitCreateFileFormat( ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { - if (ctx.storageHandler == null) { - typedVisit[CatalogStorageFormat](ctx.fileFormat) - } else { - visitStorageHandler(ctx.storageHandler) + (ctx.fileFormat, ctx.storageHandler) match { + // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format + case (c: TableFileFormatContext, null) => + visitTableFileFormat(c) + // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO + case (c: GenericFileFormatContext, null) => + visitGenericFileFormat(c) + case (null, storageHandler) => + throw new ParseException("Operation not allowed: ... STORED BY storage_handler ...", ctx) + case _ => + throw new ParseException("expected either STORED AS or STORED BY, not both", ctx) } } /** - * Create a [[CreateTableAsSelect]] command. + * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelect]]. + * + * This is not used to create datasource tables, which is handled through + * "CREATE TABLE ... USING ...". + * + * Note: several features are currently not supported - temporary tables, bucketing, + * skewed columns and storage handlers (STORED BY). + * + * Expected format: + * {{{ + * CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name + * [(col1 data_type [COMMENT col_comment], ...)] + * [COMMENT table_comment] + * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)] + * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS] + * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]] + * [ROW FORMAT row_format] + * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]] + * [LOCATION path] + * [TBLPROPERTIES (property_name=property_value, ...)] + * [AS select_statement]; + * }}} */ - override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { - if (ctx.query == null) { - HiveNativeCommand(command(ctx)) + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { + val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) + // TODO: implement temporary tables + if (temp) { + throw new ParseException( + "CREATE TEMPORARY TABLE is not supported yet. " + + "Please use registerTempTable as an alternative.", ctx) + } + if (ctx.skewSpec != null) { + throw new ParseException("Operation not allowed: CREATE TABLE ... SKEWED BY ...", ctx) + } + if (ctx.bucketSpec != null) { + throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx) + } + val tableType = if (external) { + CatalogTableType.EXTERNAL_TABLE } else { - // Get the table header. - val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) - val tableType = if (external) { - CatalogTableType.EXTERNAL_TABLE - } else { - CatalogTableType.MANAGED_TABLE - } - - // Unsupported clauses. - if (temp) { - throw new ParseException(s"Unsupported operation: TEMPORARY clause.", ctx) - } - if (ctx.bucketSpec != null) { - // TODO add this - we need cluster columns in the CatalogTable for this to work. - throw new ParseException("Unsupported operation: " + - "CLUSTERED BY ... [ORDERED BY ...] INTO ... BUCKETS clause.", ctx) - } - if (ctx.skewSpec != null) { - throw new ParseException("Operation not allowed: " + - "SKEWED BY ... ON ... [STORED AS DIRECTORIES] clause.", ctx) - } - - // Create the schema. - val schema = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns(_, _.toLowerCase)) - - // Get the column by which the table is partitioned. - val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns(_)) - - // Create the storage. - def format(fmt: ParserRuleContext): CatalogStorageFormat = { - Option(fmt).map(typedVisit[CatalogStorageFormat]).getOrElse(EmptyStorageFormat) - } - // Default storage. + CatalogTableType.MANAGED_TABLE + } + val comment = Option(ctx.STRING).map(string) + val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns) + val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns) + val properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty) + val selectQuery = Option(ctx.query).map(plan) + + // Note: Hive requires partition columns to be distinct from the schema, so we need + // to include the partition columns here explicitly + val schema = cols ++ partitionCols + + // Storage format + val defaultStorage: CatalogStorageFormat = { val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) - val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse { - HiveSerDe( - inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) - } - // Defined storage. - val fileStorage = format(ctx.createFileFormat) - val rowStorage = format(ctx.rowFormat) - val storage = CatalogStorageFormat( - Option(ctx.locationSpec).map(visitLocationSpec), - fileStorage.inputFormat.orElse(hiveSerDe.inputFormat), - fileStorage.outputFormat.orElse(hiveSerDe.outputFormat), - rowStorage.serde.orElse(hiveSerDe.serde).orElse(fileStorage.serde), - rowStorage.serdeProperties ++ fileStorage.serdeProperties - ) + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf) + CatalogStorageFormat( + locationUri = None, + inputFormat = defaultHiveSerde.flatMap(_.inputFormat) + .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), + outputFormat = defaultHiveSerde.flatMap(_.outputFormat) + .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + // Note: Keep this unspecified because we use the presence of the serde to decide + // whether to convert a table created by CTAS to a datasource table. + serde = None, + serdeProperties = Map()) + } + val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) + .getOrElse(EmptyStorageFormat) + val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) + val location = Option(ctx.locationSpec).map(visitLocationSpec) + val storage = CatalogStorageFormat( + locationUri = location, + inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), + outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), + serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties) + + // TODO support the sql text - have a proper location for this! + val tableDesc = CatalogTable( + identifier = name, + tableType = tableType, + storage = storage, + schema = schema, + partitionColumnNames = partitionCols.map(_.name), + properties = properties, + comment = comment) - val tableDesc = CatalogTable( - identifier = table, - tableType = tableType, - schema = schema, - partitionColumns = partitionCols, - storage = storage, - properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty), - // TODO support the sql text - have a proper location for this! - viewText = Option(ctx.STRING).map(string)) - CTAS(tableDesc, plan(ctx.query), ifNotExists) + selectQuery match { + case Some(q) => CTAS(tableDesc, q, ifNotExists) + case None => CreateTable(tableDesc, ifNotExists) } } @@ -353,25 +387,19 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty) /** - * Create a [[CatalogStorageFormat]]. The INPUTDRIVER and OUTPUTDRIVER clauses are currently - * ignored. + * Create a [[CatalogStorageFormat]]. */ override def visitTableFileFormat( ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { - import ctx._ - if (inDriver != null || outDriver != null) { - throw new ParseException( - s"Operation not allowed: INPUTDRIVER ... OUTPUTDRIVER ... clauses", ctx) - } EmptyStorageFormat.copy( - inputFormat = Option(string(inFmt)), - outputFormat = Option(string(outFmt)), - serde = Option(serdeCls).map(string) + inputFormat = Option(string(ctx.inFmt)), + outputFormat = Option(string(ctx.outFmt)), + serde = Option(ctx.serdeCls).map(string) ) } /** - * Resolve a [[HiveSerDe]] based on the format name given. + * Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]]. */ override def visitGenericFileFormat( ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { @@ -388,11 +416,28 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { } /** - * Storage Handlers are currently not supported in the statements we support (CTAS). + * Create a [[RowFormat]] used for creating tables. + * + * Example format: + * {{{ + * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)] + * }}} + * + * OR + * + * {{{ + * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] + * [COLLECTION ITEMS TERMINATED BY char] + * [MAP KEYS TERMINATED BY char] + * [LINES TERMINATED BY char] + * [NULL DEFINED AS char] + * }}} */ - override def visitStorageHandler( - ctx: StorageHandlerContext): CatalogStorageFormat = withOrigin(ctx) { - throw new ParseException("Storage Handlers are currently unsupported.", ctx) + private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) { + ctx match { + case serde: RowFormatSerdeContext => visitRowFormatSerde(serde) + case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited) + } } /** @@ -435,13 +480,15 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { /** * Create a sequence of [[CatalogColumn]]s from a column list */ - private def visitCatalogColumns( - ctx: ColTypeListContext, - formatter: String => String = identity): Seq[CatalogColumn] = withOrigin(ctx) { + private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) { ctx.colType.asScala.map { col => CatalogColumn( - formatter(col.identifier.getText), - col.dataType.getText.toLowerCase, // TODO validate this? + col.identifier.getText.toLowerCase, + // Note: for types like "STRUCT" we can't + // just convert the whole type string to lower case, otherwise the struct field names + // will no longer be case sensitive. Instead, we rely on our parser to get the proper + // case before passing it to Hive. + CatalystSqlParser.parseDataType(col.dataType.getText).simpleString, nullable = true, Option(col.STRING).map(string)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index e8086aec327bf..68d3ea6ed93db 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} +import org.apache.spark.sql.execution.command.CreateTable import org.apache.spark.sql.hive.execution.{HiveNativeCommand, HiveSqlParser} class HiveDDLCommandSuite extends PlanTest { @@ -36,6 +37,7 @@ class HiveDDLCommandSuite extends PlanTest { private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parser.parsePlan(sql).collect { + case CreateTable(desc, allowExisting) => (desc, allowExisting) case CreateTableAsSelect(desc, _, allowExisting) => (desc, allowExisting) case CreateViewAsSelect(desc, _, allowExisting, _, _) => (desc, allowExisting) }.head @@ -76,9 +78,12 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("page_url", "string") :: CatalogColumn("referrer_url", "string") :: CatalogColumn("ip", "string", comment = Some("IP Address of the User")) :: - CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) + CatalogColumn("country", "string", comment = Some("country of origination")) :: + CatalogColumn("dt", "string", comment = Some("date type")) :: + CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) + assert(desc.comment == Some("This is the staging page view table")) // TODO will be SQLText - assert(desc.viewText == Option("This is the staging page view table")) + assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.partitionColumns == CatalogColumn("dt", "string", comment = Some("date type")) :: @@ -123,9 +128,12 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("page_url", "string") :: CatalogColumn("referrer_url", "string") :: CatalogColumn("ip", "string", comment = Some("IP Address of the User")) :: - CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) + CatalogColumn("country", "string", comment = Some("country of origination")) :: + CatalogColumn("dt", "string", comment = Some("date type")) :: + CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) // TODO will be SQLText - assert(desc.viewText == Option("This is the staging page view table")) + assert(desc.comment == Some("This is the staging page view table")) + assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.partitionColumns == CatalogColumn("dt", "string", comment = Some("date type")) :: @@ -151,7 +159,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.storage.serdeProperties == Map()) assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) assert(desc.storage.outputFormat == - Some("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) + Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) assert(desc.storage.serde.isEmpty) assert(desc.properties == Map()) } @@ -203,17 +211,6 @@ class HiveDDLCommandSuite extends PlanTest { |AS SELECT key, value FROM src ORDER BY key, value """.stripMargin) } - intercept[ParseException] { - parser.parsePlan( - """CREATE TABLE ctas2 - |STORED AS - |INPUTFORMAT "org.apache.hadoop.mapred.TextInputFormat" - |OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" - |INPUTDRIVER "org.apache.hadoop.hive.howl.rcfile.RCFileInputDriver" - |OUTPUTDRIVER "org.apache.hadoop.hive.howl.rcfile.RCFileOutputDriver" - |AS SELECT key, value FROM src ORDER BY key, value - """.stripMargin) - } intercept[ParseException] { parser.parsePlan( """ @@ -324,6 +321,194 @@ class HiveDDLCommandSuite extends PlanTest { """.stripMargin) } + test("create table - basic") { + val query = "CREATE TABLE my_table (id int, name string)" + val (desc, allowExisting) = extractTableDesc(query) + assert(!allowExisting) + assert(desc.identifier.database.isEmpty) + assert(desc.identifier.table == "my_table") + assert(desc.tableType == CatalogTableType.MANAGED_TABLE) + assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string"))) + assert(desc.partitionColumnNames.isEmpty) + assert(desc.sortColumnNames.isEmpty) + assert(desc.bucketColumnNames.isEmpty) + assert(desc.numBuckets == -1) + assert(desc.viewText.isEmpty) + assert(desc.viewOriginalText.isEmpty) + assert(desc.storage.locationUri.isEmpty) + assert(desc.storage.inputFormat == + Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(desc.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + assert(desc.storage.serde.isEmpty) + assert(desc.storage.serdeProperties.isEmpty) + assert(desc.properties.isEmpty) + assert(desc.comment.isEmpty) + } + + test("create table - with database name") { + val query = "CREATE TABLE dbx.my_table (id int, name string)" + val (desc, _) = extractTableDesc(query) + assert(desc.identifier.database == Some("dbx")) + assert(desc.identifier.table == "my_table") + } + + test("create table - temporary") { + val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)" + val e = intercept[ParseException] { parser.parsePlan(query) } + assert(e.message.contains("registerTempTable")) + } + + test("create table - external") { + val query = "CREATE EXTERNAL TABLE tab1 (id int, name string)" + val (desc, _) = extractTableDesc(query) + assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) + } + + test("create table - if not exists") { + val query = "CREATE TABLE IF NOT EXISTS tab1 (id int, name string)" + val (_, allowExisting) = extractTableDesc(query) + assert(allowExisting) + } + + test("create table - comment") { + val query = "CREATE TABLE my_table (id int, name string) COMMENT 'its hot as hell below'" + val (desc, _) = extractTableDesc(query) + assert(desc.comment == Some("its hot as hell below")) + } + + test("create table - partitioned columns") { + val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)" + val (desc, _) = extractTableDesc(query) + assert(desc.schema == Seq( + CatalogColumn("id", "int"), + CatalogColumn("name", "string"), + CatalogColumn("month", "int"))) + assert(desc.partitionColumnNames == Seq("month")) + } + + test("create table - clustered by") { + val baseQuery = "CREATE TABLE my_table (id int, name string) CLUSTERED BY(id)" + val query1 = s"$baseQuery INTO 10 BUCKETS" + val query2 = s"$baseQuery SORTED BY(id) INTO 10 BUCKETS" + val e1 = intercept[ParseException] { parser.parsePlan(query1) } + val e2 = intercept[ParseException] { parser.parsePlan(query2) } + assert(e1.getMessage.contains("Operation not allowed")) + assert(e2.getMessage.contains("Operation not allowed")) + } + + test("create table - skewed by") { + val baseQuery = "CREATE TABLE my_table (id int, name string) SKEWED BY" + val query1 = s"$baseQuery(id) ON (1, 10, 100)" + val query2 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z'))" + val query3 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z')) STORED AS DIRECTORIES" + val e1 = intercept[ParseException] { parser.parsePlan(query1) } + val e2 = intercept[ParseException] { parser.parsePlan(query2) } + val e3 = intercept[ParseException] { parser.parsePlan(query3) } + assert(e1.getMessage.contains("Operation not allowed")) + assert(e2.getMessage.contains("Operation not allowed")) + assert(e3.getMessage.contains("Operation not allowed")) + } + + test("create table - row format") { + val baseQuery = "CREATE TABLE my_table (id int, name string) ROW FORMAT" + val query1 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff'" + val query2 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1')" + val query3 = + s""" + |$baseQuery DELIMITED FIELDS TERMINATED BY 'x' ESCAPED BY 'y' + |COLLECTION ITEMS TERMINATED BY 'a' + |MAP KEYS TERMINATED BY 'b' + |LINES TERMINATED BY '\n' + |NULL DEFINED AS 'c' + """.stripMargin + val (desc1, _) = extractTableDesc(query1) + val (desc2, _) = extractTableDesc(query2) + val (desc3, _) = extractTableDesc(query3) + assert(desc1.storage.serde == Some("org.apache.poof.serde.Baff")) + assert(desc1.storage.serdeProperties.isEmpty) + assert(desc2.storage.serde == Some("org.apache.poof.serde.Baff")) + assert(desc2.storage.serdeProperties == Map("k1" -> "v1")) + assert(desc3.storage.serdeProperties == Map( + "field.delim" -> "x", + "escape.delim" -> "y", + "serialization.format" -> "x", + "line.delim" -> "\n", + "colelction.delim" -> "a", // yes, it's a typo from Hive :) + "mapkey.delim" -> "b")) + } + + test("create table - file format") { + val baseQuery = "CREATE TABLE my_table (id int, name string) STORED AS" + val query1 = s"$baseQuery INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput'" + val query2 = s"$baseQuery ORC" + val (desc1, _) = extractTableDesc(query1) + val (desc2, _) = extractTableDesc(query2) + assert(desc1.storage.inputFormat == Some("winput")) + assert(desc1.storage.outputFormat == Some("wowput")) + assert(desc1.storage.serde.isEmpty) + assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) + assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) + assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + } + + test("create table - storage handler") { + val baseQuery = "CREATE TABLE my_table (id int, name string) STORED BY" + val query1 = s"$baseQuery 'org.papachi.StorageHandler'" + val query2 = s"$baseQuery 'org.mamachi.StorageHandler' WITH SERDEPROPERTIES ('k1'='v1')" + val e1 = intercept[ParseException] { parser.parsePlan(query1) } + val e2 = intercept[ParseException] { parser.parsePlan(query2) } + assert(e1.getMessage.contains("Operation not allowed")) + assert(e2.getMessage.contains("Operation not allowed")) + } + + test("create table - location") { + val query = "CREATE TABLE my_table (id int, name string) LOCATION '/path/to/mars'" + val (desc, _) = extractTableDesc(query) + assert(desc.storage.locationUri == Some("/path/to/mars")) + } + + test("create table - properties") { + val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')" + val (desc, _) = extractTableDesc(query) + assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2")) + } + + test("create table - everything!") { + val query = + """ + |CREATE EXTERNAL TABLE IF NOT EXISTS dbx.my_table (id int, name string) + |COMMENT 'no comment' + |PARTITIONED BY (month int) + |ROW FORMAT SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1') + |STORED AS INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput' + |LOCATION '/path/to/mercury' + |TBLPROPERTIES ('k1'='v1', 'k2'='v2') + """.stripMargin + val (desc, allowExisting) = extractTableDesc(query) + assert(allowExisting) + assert(desc.identifier.database == Some("dbx")) + assert(desc.identifier.table == "my_table") + assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) + assert(desc.schema == Seq( + CatalogColumn("id", "int"), + CatalogColumn("name", "string"), + CatalogColumn("month", "int"))) + assert(desc.partitionColumnNames == Seq("month")) + assert(desc.sortColumnNames.isEmpty) + assert(desc.bucketColumnNames.isEmpty) + assert(desc.numBuckets == -1) + assert(desc.viewText.isEmpty) + assert(desc.viewOriginalText.isEmpty) + assert(desc.storage.locationUri == Some("/path/to/mercury")) + assert(desc.storage.inputFormat == Some("winput")) + assert(desc.storage.outputFormat == Some("wowput")) + assert(desc.storage.serde == Some("org.apache.poof.serde.Baff")) + assert(desc.storage.serdeProperties == Map("k1" -> "v1")) + assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2")) + assert(desc.comment == Some("no comment")) + } + test("create view -- basic") { val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1" val (desc, exists) = extractTableDesc(v1) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index ada8621d07579..8648834f0d881 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -88,7 +88,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) - assert(hiveTable.partitionColumns.isEmpty) + assert(hiveTable.partitionColumnNames.isEmpty) assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE) val columns = hiveTable.schema @@ -151,7 +151,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) - assert(hiveTable.partitionColumns.isEmpty) + assert(hiveTable.partitionColumnNames.isEmpty) assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE) val columns = hiveTable.schema diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 40e9c9362cf5e..4db95636e7610 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -81,7 +81,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef test("Double create fails when allowExisting = false") { sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)") - intercept[QueryExecutionException] { + intercept[AnalysisException] { sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 37c01792d9c3f..97cb9d972081c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -149,7 +149,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScan(columns, relation, _) => val columnNames = columns.map(_.name) - val partValues = if (relation.table.partitionColumns.nonEmpty) { + val partValues = if (relation.table.partitionColumnNames.nonEmpty) { p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues) } else { Seq.empty diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 7eaf19dfe9fd2..5ce16be4dc059 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -360,7 +360,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { var message = intercept[AnalysisException] { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") }.getMessage - assert(message.contains("ctas1 already exists")) + assert(message.contains("already exists")) checkRelation("ctas1", true) sql("DROP TABLE ctas1")