From 2498b95960c2dee18072cdb7ecd87d2ce6b949f4 Mon Sep 17 00:00:00 2001 From: Kris Geusebroek Date: Fri, 27 Jul 2018 14:57:43 +0200 Subject: [PATCH 1/4] Support alter table/partition file format --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 + .../sql/catalyst/catalog/interface.scala | 3 +- .../spark/sql/execution/SparkSqlParser.scala | 26 ++ .../spark/sql/execution/command/ddl.scala | 45 +++ .../execution/MultiFormatTableSuite.scala | 284 ++++++++++++++++++ 5 files changed, 359 insertions(+), 1 deletion(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 2aca10f1bfbc7..a6da481a619af 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -108,6 +108,8 @@ statement SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)? #setTableSerDe | ALTER TABLE tableIdentifier (partitionSpec)? SET SERDEPROPERTIES tablePropertyList #setTableSerDe + | ALTER TABLE tableIdentifier (partitionSpec)? + SET FILEFORMAT fileFormat #setTableFormat | ALTER TABLE tableIdentifier ADD (IF NOT EXISTS)? partitionSpecLocation+ #addTablePartition | ALTER VIEW tableIdentifier ADD (IF NOT EXISTS)? diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index a4ead538bb51a..575b7c68a18cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -240,7 +240,8 @@ case class CatalogTable( unsupportedFeatures: Seq[String] = Seq.empty, tracksPartitionsInCatalog: Boolean = false, schemaPreservesCase: Boolean = true, - ignoredProperties: Map[String, String] = Map.empty) { + ignoredProperties: Map[String, String] = Map.empty, + hasMultiFormatPartitions: Boolean = false) { import CatalogTable._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 4828fa60a7b58..b968ae581296c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -857,6 +857,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) } + /** + * Create an [[AlterTableFormatPropertiesCommand]] command. + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] SET FILEFORMAT format; + * }}} + */ + override def visitSetTableFormat(ctx: SetTableFormatContext): LogicalPlan = withOrigin(ctx) { + val format = (ctx.fileFormat) match { + // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format + case (c: TableFileFormatContext) => + visitTableFileFormat(c) + // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO + case (c: GenericFileFormatContext) => + visitGenericFileFormat(c) + case _ => + throw new ParseException("Expected STORED AS ", ctx) + } + AlterTableFormatCommand( + visitTableIdentifier(ctx.tableIdentifier), + format, + // TODO a partition spec is allowed to have optional values. This is currently violated. + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) + } + /** * Create an [[AlterTableAddPartitionCommand]] command. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e1faecedd20ed..de78bf0f4d9bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -415,6 +415,51 @@ case class AlterTableSerDePropertiesCommand( } +/** + * A command that sets the format of a table/view/partition . + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table [PARTITION spec] SET FILEFORMAT format; + * }}} + */ +case class AlterTableFormatCommand( + tableName: TableIdentifier, + format: CatalogStorageFormat, + partSpec: Option[TablePartitionSpec]) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + DDLUtils.verifyAlterTableType(catalog, table, isView = false) + // For datasource tables, disallow setting serde or specifying partition + if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException("Operation not allowed: ALTER TABLE SET FILEFORMAT " + + "for a specific partition is not supported " + + "for tables created with the datasource API") + } + if (partSpec.isEmpty) { + val newTable = table.withNewStorage( + serde = format.serde.orElse(table.storage.serde), + inputFormat = format.inputFormat.orElse(table.storage.inputFormat), + outputFormat = format.outputFormat.orElse(table.storage.outputFormat), + properties = table.storage.properties ++ format.properties) + catalog.alterTable(newTable) + } else { + val spec = partSpec.get + val part = catalog.getPartition(table.identifier, spec) + val newPart = part.copy(storage = part.storage.copy( + serde = format.serde.orElse(part.storage.serde), + inputFormat = format.inputFormat.orElse(table.storage.inputFormat), + outputFormat = format.outputFormat.orElse(table.storage.outputFormat), + properties = part.storage.properties ++ format.properties)) + catalog.alterPartitions(table.identifier, Seq(newPart)) + } + Seq.empty[Row] + } +} + /** * Add Partition in ALTER TABLE: add the table partitions. * diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala new file mode 100644 index 0000000000000..4a966ed4f4c34 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { + try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + private case class PartitionDefinition( + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { + def toSpec: String = { + s"($column='$value')" + } + def toSpecAsMap: Map[String, String] = { + Map(column -> value) + } + } + + test("create hive table with multi format partitions") { + val catalog = spark.sessionState.catalog + withTempDir { baseDir => + + val partitionedTable = "ext_multiformat_partition_table" + withTable(partitionedTable) { + assert(baseDir.listFiles.isEmpty) + + val partitions = createMultiformatPartitionDefinitions(baseDir) + + createTableWithPartitions(partitionedTable, baseDir, partitions) + + // Check table storage type is PARQUET + val hiveResultTable = + catalog.getTableMetadata(TableIdentifier(partitionedTable, Some("default"))) + assert(DDLUtils.isHiveTable(hiveResultTable)) + assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL) + assert(hiveResultTable.storage.inputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") + ) + assert(hiveResultTable.storage.outputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") + ) + assert(hiveResultTable.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + ) + + // Check table has correct partititons + assert( + catalog.listPartitions(TableIdentifier(partitionedTable, + Some("default"))).map(_.spec).toSet == partitions.map(_.toSpecAsMap).toSet + ) + + // Check first table partition storage type is PARQUET + val parquetPartition = catalog.getPartition( + TableIdentifier(partitionedTable, Some("default")), + partitions.head.toSpecAsMap + ) + assert( + parquetPartition.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + ) + + // Check second table partition storage type is AVRO + val avroPartition = catalog.getPartition( + TableIdentifier(partitionedTable, Some("default")), + partitions.last.toSpecAsMap + ) + assert( + avroPartition.storage.serde.contains("org.apache.hadoop.hive.serde2.avro.AvroSerDe") + ) + + assert( + avroPartition.storage.inputFormat + .contains("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat") + ) + } + } + } + + + private def createMultiformatPartitionDefinitions(baseDir: File): List[PartitionDefinition] = { + val basePath = baseDir.getCanonicalPath + val partitionPath_part1 = new File(basePath + s"/$partitionCol=$partitionVal1") + val partitionPath_part2 = new File(basePath + s"/$partitionCol=$partitionVal2") + + List( + PartitionDefinition( + partitionCol, partitionVal1, partitionPath_part1.toURI, format = Some("PARQUET") + ), + PartitionDefinition( + partitionCol, partitionVal2, partitionPath_part2.toURI, format = Some("AVRO") + ) + ) + } + + private def createParquetPartitionDefinitions(baseDir: File): List[PartitionDefinition] = { + val basePath = baseDir.getCanonicalPath + val partitionPath_part1 = new File(basePath + s"/$partitionCol=$partitionVal1") + val partitionPath_part2 = new File(basePath + s"/$partitionCol=$partitionVal2") + + List( + PartitionDefinition( + partitionCol, partitionVal1, partitionPath_part1.toURI, format = Some("PARQUET") + ), + PartitionDefinition( + partitionCol, partitionVal2, partitionPath_part2.toURI, format = Some("PARQUET") + ) + ) + } + + private def createTableWithPartitions(table: String, + baseDir: File, + partitions: List[PartitionDefinition], + avro: Boolean = false + ): Unit = { + if (avro) { + createAvroExternalTable(table, baseDir.toURI) + } else { + createParquetExternalTable(table, baseDir.toURI) + } + addPartitions(table, partitions) + partitions.foreach(p => setPartitionFormat(table, p)) + } + + private def createAvroCheckTable(avroTable: String, partition: PartitionDefinition): Unit = { + // The only valid way to insert avro data into the avro partition + // is to create a new avro table directly on the location of the avro partition + val avroSchema = + """{ + | "name": "baseRecord", + | "type": "record", + | "fields": [{ + | "name": "key", + | "type": ["null", "int"], + | "default": null + | }, + | { + | "name": "value", + | "type": ["null", "string"], + | "default": null + | }] + |} + """.stripMargin + + // Creates the Avro table + sql( + s""" + |CREATE TABLE $avroTable + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '${partition.location}' + |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema') + """.stripMargin + ) + } + + private def createPqCheckTable(pqTable: String, partition: PartitionDefinition): Unit = { + + // Creates the Parquet table + sql( + s""" + |CREATE TABLE $pqTable (key INT, value STRING) + |STORED AS PARQUET + |LOCATION '${partition.location}' + """.stripMargin + ) + } + + private def createParquetExternalTable(table: String, location: URI): DataFrame = { + sql( + s""" + |CREATE EXTERNAL TABLE $table (key INT, value STRING) + |PARTITIONED BY (dt STRING) + |STORED AS PARQUET + |LOCATION '$location' + """.stripMargin + ) + } + + private def createAvroExternalTable(table: String, location: URI): DataFrame = { + val avroSchema = + """{ + | "name": "baseRecord", + | "type": "record", + | "fields": [{ + | "name": "key", + | "type": ["null", "int"], + | "default": null + | }, + | { + | "name": "value", + | "type": ["null", "string"], + | "default": null + | }] + |} + """.stripMargin + sql( + s""" + |CREATE EXTERNAL TABLE $table (key INT, value STRING) + |PARTITIONED BY (dt STRING) + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$location' + |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema') + """.stripMargin + ) + } + + private def addPartitions(table: String, partitionDefs: List[PartitionDefinition]): DataFrame = { + val partitions = partitionDefs + .map(definition => s"PARTITION ${definition.toSpec} LOCATION '${definition.location}'") + .mkString("\n") + + sql( + s""" + |ALTER TABLE $table ADD + |$partitions + """.stripMargin + ) + + } + + private def setPartitionFormat( + table: String, + partitionDef: PartitionDefinition + ): DataFrame = { + sql( + s""" + |ALTER TABLE $table + |PARTITION ${partitionDef.toSpec} SET FILEFORMAT ${partitionDef.format.get} + """.stripMargin + ) + } +} From 393416991562cf73d5304cc39c727cad53b2e610 Mon Sep 17 00:00:00 2001 From: Kris Geusebroek Date: Fri, 27 Jul 2018 15:22:01 +0200 Subject: [PATCH 2/4] Support selecting from partitioned tables with different data formats --- .../spark/sql/hive/HiveStrategies.scala | 4 +- .../sql/hive/client/HiveClientImpl.scala | 2 + .../execution/MultiFormatTableSuite.scala | 228 ++++++++++++++++++ 3 files changed, 233 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 9fe83bb332a9a..3afedb8331436 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -182,7 +182,9 @@ case class RelationConversions( sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + val hasMultiFormatPartitions = relation.tableMeta.hasMultiFormatPartitions + serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) && + (!hasMultiFormatPartitions) || serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index db8fd5a43d842..b22793cc034bb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -437,6 +437,8 @@ private[hive] class HiveClientImpl( throw new AnalysisException("Hive index table is not supported.") }, schema = schema, + hasMultiFormatPartitions = + shim.getAllPartitions(client, h).map(_.getInputFormatClass).distinct.size > 1, partitionColumnNames = partCols.map(_.name), // If the table is written by Spark, we will put bucketing information in table properties, // and will always overwrite the bucket spec in hive metastore by the bucketing information diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala index 4a966ed4f4c34..cb709674bc987 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala @@ -124,6 +124,219 @@ class MultiFormatTableSuite } } + test("create hive table with only parquet partitions - test plan") { + withTempDir { baseDir => + val partitionedTable = "ext_parquet_partition_table" + + val partitions = createParquetPartitionDefinitions(baseDir) + + withTable(partitionedTable) { + assert(baseDir.listFiles.isEmpty) + + createTableWithPartitions(partitionedTable, baseDir, partitions) + + val selectQuery = + s""" + |SELECT key, value FROM ${partitionedTable} + """.stripMargin + + val plan = parser.parsePlan(selectQuery) + + plan.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]) shouldNot equal(None) + + } + } + } + + test("create hive table with only avro partitions - test plan") { + withTempDir { baseDir => + val partitionedTable = "ext_avro_partition_table" + + val partitions = createAvroPartitionDefinitions(baseDir) + + withTable(partitionedTable) { + assert(baseDir.listFiles.isEmpty) + + createTableWithPartitions(partitionedTable, baseDir, partitions, avro = true) + + val selectQuery = + s""" + |SELECT key, value FROM ${partitionedTable} + """.stripMargin + + val plan = parser.parsePlan(selectQuery) + + plan.queryExecution.sparkPlan.find(_.isInstanceOf[HiveTableScanExec]) shouldNot equal(None) + + } + } + } + + test("create hive avro table with multi format partitions containing correct data") { + withTempDir { baseDir => + val partitionedTable = "ext_multiformat_partition_table_with_data" + val avroPartitionTable = "ext_avro_partition_table" + val pqPartitionTable = "ext_pq_partition_table" + + val partitions = createMultiformatPartitionDefinitions(baseDir) + + withTable(partitionedTable, avroPartitionTable, pqPartitionTable) { + assert(baseDir.listFiles.isEmpty) + + createTableWithPartitions(partitionedTable, baseDir, partitions, true) + createAvroCheckTable(avroPartitionTable, partitions.last) + createPqCheckTable(pqPartitionTable, partitions.head) + + // INSERT OVERWRITE TABLE only works for the default table format. + // So we can use it here to insert data into the parquet partition + sql( + s""" + |INSERT OVERWRITE TABLE $pqPartitionTable + |SELECT 1 as id, 'a' as value + """.stripMargin) + + val parquetData = spark.read.parquet(partitions.head.location.toString) + checkAnswer(parquetData, Row(1, "a")) + + sql( + s""" + |INSERT OVERWRITE TABLE $avroPartitionTable + |SELECT 2, 'b' + """.stripMargin + ) + + // Directly reading from the avro table should yield correct results + val avroData = spark.read.table(avroPartitionTable) + checkAnswer(avroData, Row(2, "b")) + + val parquetPartitionSelectQuery = + s""" + |SELECT key, value FROM ${partitionedTable} + |WHERE ${partitionCol}='${partitionVal1}' + """.stripMargin + + val avroPartitionSelectQuery = + s""" + |SELECT key, value FROM ${partitionedTable} + |WHERE ${partitionCol}='${partitionVal2}' + """.stripMargin + + val selectQuery = + s""" + |SELECT key, value FROM ${partitionedTable} + """.stripMargin + + val avroPartitionData = sql(avroPartitionSelectQuery) + checkAnswer(avroPartitionData, Row(2, "b")) + + val parquetPartitionData = sql(parquetPartitionSelectQuery) + checkAnswer(parquetPartitionData, Row(1, "a")) + + val allData = sql(selectQuery) + checkAnswer(allData, Seq(Row(1, "a"), Row(2, "b"))) + + } + } + } + + test("create hive table with multi format partitions - test plan") { + withTempDir { baseDir => + val partitionedTable = "ext_multiformat_partition_table" + + val partitions = createMultiformatPartitionDefinitions(baseDir) + + withTable(partitionedTable) { + assert(baseDir.listFiles.isEmpty) + + createTableWithPartitions(partitionedTable, baseDir, partitions) + + val selectQuery = + s""" + |SELECT key, value FROM ${partitionedTable} + """.stripMargin + + val plan = parser.parsePlan(selectQuery) + + plan.queryExecution.sparkPlan.find(_.isInstanceOf[HiveTableScanExec]) shouldNot equal(None) + + } + } + } + + test("create hive table with multi format partitions containing correct data") { + withTempDir { baseDir => + // withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { + val partitionedTable = "ext_multiformat_partition_table_with_data" + val avroPartitionTable = "ext_avro_partition_table" + val pqPartitionTable = "ext_pq_partition_table" + + val partitions = createMultiformatPartitionDefinitions(baseDir) + + withTable(partitionedTable, avroPartitionTable, pqPartitionTable) { + assert(baseDir.listFiles.isEmpty) + + createTableWithPartitions(partitionedTable, baseDir, partitions) + createAvroCheckTable(avroPartitionTable, partitions.last) + createPqCheckTable(pqPartitionTable, partitions.head) + + // INSERT OVERWRITE TABLE only works for the default table format. + // So we can use it here to insert data into the parquet partition + sql( + s""" + |INSERT OVERWRITE TABLE $pqPartitionTable + |SELECT 1 as id, 'a' as value + """.stripMargin) + + val parquetData = spark.read.parquet(partitions.head.location.toString) + checkAnswer(parquetData, Row(1, "a")) + + sql( + s""" + |INSERT OVERWRITE TABLE $avroPartitionTable + |SELECT 2, 'b' + """.stripMargin + ) + + // Directly reading from the avro table should yield correct results + val avroData = spark.read.table(avroPartitionTable) + checkAnswer(avroData, Row(2, "b")) + + val parquetPartitionSelectQuery = + s""" + |SELECT key, value FROM ${partitionedTable} + |WHERE ${partitionCol}='${partitionVal1}' + """.stripMargin + + val parquetPartitionData = sql(parquetPartitionSelectQuery) + checkAnswer(parquetPartitionData, Row(1, "a")) + + val avroPartitionSelectQuery = + s""" + |SELECT key, value FROM ${partitionedTable} + |WHERE ${partitionCol}='${partitionVal2}' + """.stripMargin + + val avroCheckTableSelectQuery = + s""" + |SELECT key, value FROM ${avroPartitionTable} + """.stripMargin + + val selectQuery = + s""" + |SELECT key, value FROM ${partitionedTable} + """.stripMargin + + // Selecting data from the partition currently fails because it tries to + // read avro data with parquet reader + val avroPartitionData = sql(avroPartitionSelectQuery) + checkAnswer(avroPartitionData, Row(2, "b")) + + val allData = sql(selectQuery) + checkAnswer(allData, Seq(Row(1, "a"), Row(2, "b"))) + } + // } + } + } private def createMultiformatPartitionDefinitions(baseDir: File): List[PartitionDefinition] = { val basePath = baseDir.getCanonicalPath @@ -155,6 +368,21 @@ class MultiFormatTableSuite ) } + private def createAvroPartitionDefinitions(baseDir: File): List[PartitionDefinition] = { + val basePath = baseDir.getCanonicalPath + val partitionPath_part1 = new File(basePath + s"/$partitionCol=$partitionVal1") + val partitionPath_part2 = new File(basePath + s"/$partitionCol=$partitionVal2") + + List( + PartitionDefinition( + partitionCol, partitionVal1, partitionPath_part1.toURI, format = Some("AVRO") + ), + PartitionDefinition( + partitionCol, partitionVal2, partitionPath_part2.toURI, format = Some("AVRO") + ) + ) + } + private def createTableWithPartitions(table: String, baseDir: File, partitions: List[PartitionDefinition], From 347269d0a077058c828336467d6936efe83ed64d Mon Sep 17 00:00:00 2001 From: Kris Geusebroek Date: Sun, 29 Jul 2018 23:55:13 +0200 Subject: [PATCH 3/4] Processed review comments --- .../org/apache/spark/sql/catalyst/parser/SqlBase.g4 | 1 - .../spark/sql/catalyst/parser/ParserUtils.scala | 3 +++ .../apache/spark/sql/execution/SparkSqlParser.scala | 6 ++++-- .../org/apache/spark/sql/execution/command/ddl.scala | 6 +++--- .../sql/hive/execution/MultiFormatTableSuite.scala | 12 +++++++----- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index a6da481a619af..ea6e09446f14d 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -211,7 +211,6 @@ unsupportedHiveNativeCommands | kw1=ALTER kw2=TABLE tableIdentifier kw3=TOUCH | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE - | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS | kw1=START kw2=TRANSACTION | kw1=COMMIT diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala index 89347f4b1f7bf..23523293c81e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala @@ -96,6 +96,9 @@ object ParserUtils { } } + def extraMethod(s: String): String = { + s + } /** * Register the origin of the context. Any TreeNode created in the closure will be assigned the * registered origin. This method restores the previously set origin after completion of the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index b968ae581296c..7fbe66ed15129 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -874,12 +874,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { case (c: GenericFileFormatContext) => visitGenericFileFormat(c) case _ => - throw new ParseException("Expected STORED AS ", ctx) + throw new ParseException( + "SET table/partition format expects valid file format ", + ctx + ) } AlterTableFormatCommand( visitTableIdentifier(ctx.tableIdentifier), format, - // TODO a partition spec is allowed to have optional values. This is currently violated. Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index de78bf0f4d9bf..b25afec6327fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -424,9 +424,9 @@ case class AlterTableSerDePropertiesCommand( * }}} */ case class AlterTableFormatCommand( - tableName: TableIdentifier, - format: CatalogStorageFormat, - partSpec: Option[TablePartitionSpec]) + tableName: TableIdentifier, + format: CatalogStorageFormat, + partSpec: Option[TablePartitionSpec]) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala index cb709674bc987..b194f2c414bf1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala @@ -50,12 +50,14 @@ class MultiFormatTableSuite val partitionCol = "dt" val partitionVal1 = "2018-01-26" val partitionVal2 = "2018-01-27" + private case class PartitionDefinition( - column: String, - value: String, - location: URI, - format: Option[String] = None - ) { + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { + def toSpec: String = { s"($column='$value')" } From e9d77f00bad884b19ca397e17ec7514de4588b26 Mon Sep 17 00:00:00 2001 From: Kris Geusebroek Date: Mon, 30 Jul 2018 17:59:39 +0200 Subject: [PATCH 4/4] Removed unneeded code and comments on review --- .../sql/catalyst/parser/ParserUtils.scala | 3 - .../execution/MultiFormatTableSuite.scala | 62 ++++++------------- 2 files changed, 19 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala index 23523293c81e6..89347f4b1f7bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala @@ -96,9 +96,6 @@ object ParserUtils { } } - def extraMethod(s: String): String = { - s - } /** * Register the origin of the context. Any TreeNode created in the closure will be assigned the * registered origin. This method restores the previously set origin after completion of the diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala index b194f2c414bf1..71af3bed30dc4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala @@ -57,7 +57,6 @@ class MultiFormatTableSuite location: URI, format: Option[String] = None ) { - def toSpec: String = { s"($column='$value')" } @@ -195,7 +194,8 @@ class MultiFormatTableSuite s""" |INSERT OVERWRITE TABLE $pqPartitionTable |SELECT 1 as id, 'a' as value - """.stripMargin) + """.stripMargin + ) val parquetData = spark.read.parquet(partitions.head.location.toString) checkAnswer(parquetData, Row(1, "a")) @@ -244,7 +244,6 @@ class MultiFormatTableSuite test("create hive table with multi format partitions - test plan") { withTempDir { baseDir => val partitionedTable = "ext_multiformat_partition_table" - val partitions = createMultiformatPartitionDefinitions(baseDir) withTable(partitionedTable) { @@ -267,7 +266,6 @@ class MultiFormatTableSuite test("create hive table with multi format partitions containing correct data") { withTempDir { baseDir => - // withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { val partitionedTable = "ext_multiformat_partition_table_with_data" val avroPartitionTable = "ext_avro_partition_table" val pqPartitionTable = "ext_pq_partition_table" @@ -287,7 +285,8 @@ class MultiFormatTableSuite s""" |INSERT OVERWRITE TABLE $pqPartitionTable |SELECT 1 as id, 'a' as value - """.stripMargin) + """.stripMargin + ) val parquetData = spark.read.parquet(partitions.head.location.toString) checkAnswer(parquetData, Row(1, "a")) @@ -336,60 +335,41 @@ class MultiFormatTableSuite val allData = sql(selectQuery) checkAnswer(allData, Seq(Row(1, "a"), Row(2, "b"))) } - // } } } - private def createMultiformatPartitionDefinitions(baseDir: File): List[PartitionDefinition] = { + private def createMultiformatPartitionDefinitions( + baseDir: File, + formats: List[Option[String]] = List(Some("PARQUET"), Some("AVRO")) + ): List[PartitionDefinition] = { val basePath = baseDir.getCanonicalPath val partitionPath_part1 = new File(basePath + s"/$partitionCol=$partitionVal1") val partitionPath_part2 = new File(basePath + s"/$partitionCol=$partitionVal2") List( PartitionDefinition( - partitionCol, partitionVal1, partitionPath_part1.toURI, format = Some("PARQUET") + partitionCol, partitionVal1, partitionPath_part1.toURI, format = formats.head ), PartitionDefinition( - partitionCol, partitionVal2, partitionPath_part2.toURI, format = Some("AVRO") + partitionCol, partitionVal2, partitionPath_part2.toURI, format = formats.last ) ) } private def createParquetPartitionDefinitions(baseDir: File): List[PartitionDefinition] = { - val basePath = baseDir.getCanonicalPath - val partitionPath_part1 = new File(basePath + s"/$partitionCol=$partitionVal1") - val partitionPath_part2 = new File(basePath + s"/$partitionCol=$partitionVal2") - - List( - PartitionDefinition( - partitionCol, partitionVal1, partitionPath_part1.toURI, format = Some("PARQUET") - ), - PartitionDefinition( - partitionCol, partitionVal2, partitionPath_part2.toURI, format = Some("PARQUET") - ) - ) + createMultiformatPartitionDefinitions(baseDir, List(Some("PARQUET"), Some("PARQUET"))) } private def createAvroPartitionDefinitions(baseDir: File): List[PartitionDefinition] = { - val basePath = baseDir.getCanonicalPath - val partitionPath_part1 = new File(basePath + s"/$partitionCol=$partitionVal1") - val partitionPath_part2 = new File(basePath + s"/$partitionCol=$partitionVal2") - - List( - PartitionDefinition( - partitionCol, partitionVal1, partitionPath_part1.toURI, format = Some("AVRO") - ), - PartitionDefinition( - partitionCol, partitionVal2, partitionPath_part2.toURI, format = Some("AVRO") - ) - ) + createMultiformatPartitionDefinitions(baseDir, List(Some("AVRO"), Some("AVRO"))) } - private def createTableWithPartitions(table: String, - baseDir: File, - partitions: List[PartitionDefinition], - avro: Boolean = false - ): Unit = { + private def createTableWithPartitions( + table: String, + baseDir: File, + partitions: List[PartitionDefinition], + avro: Boolean = false + ): Unit = { if (avro) { createAvroExternalTable(table, baseDir.toURI) } else { @@ -434,7 +414,6 @@ class MultiFormatTableSuite } private def createPqCheckTable(pqTable: String, partition: PartitionDefinition): Unit = { - // Creates the Parquet table sql( s""" @@ -500,10 +479,7 @@ class MultiFormatTableSuite } - private def setPartitionFormat( - table: String, - partitionDef: PartitionDefinition - ): DataFrame = { + private def setPartitionFormat(table: String, partitionDef: PartitionDefinition): DataFrame = { sql( s""" |ALTER TABLE $table