Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22158][SQL][BRANCH-2.2] convertMetastore should not ignore table property #19417

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ case class RelationConversions(
private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
if (serde.contains("parquet")) {
val options = Map(ParquetOptions.MERGE_SCHEMA ->
val options = relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = Map[String, String]()
val options = relation.tableMeta.storage.properties
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.File
import java.net.URI

import org.apache.hadoop.fs.Path
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
import org.apache.parquet.hadoop.ParquetFileReader
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkException
Expand All @@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
Expand Down Expand Up @@ -1438,12 +1441,8 @@ class HiveDDLSuite
sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
// Check if this is compressed as ZLIB.
val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc"))
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
OrcFileOperator.getFileReader(orcFilePath).get.getCompression
assert("ZLIB" === expectedCompressionKind.name())
val maybeOrcFile = path.listFiles().find(_.getName.startsWith("part"))
assertCompression(maybeOrcFile, "orc", "ZLIB")

sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
Expand Down Expand Up @@ -1941,4 +1940,47 @@ class HiveDDLSuite
}
}
}

private def assertCompression(maybeFile: Option[File], format: String, compression: String) = {
assert(maybeFile.isDefined)

val actualCompression = format match {
case "orc" =>
OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name

case "parquet" =>
val footer = ParquetFileReader.readFooter(
sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath), NO_FILTER)
footer.getBlocks.get(0).getColumns.get(0).getCodec.toString
}

assert(compression === actualCompression)
}

Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, compression) =>
test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") {
withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") {
withTable("t") {
withTempPath { path =>
sql(
s"""
|CREATE TABLE t(id int) USING hive
|OPTIONS(fileFormat '$fileFormat', compression '$compression')
|LOCATION '${path.toURI}'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde.get.contains(fileFormat))
assert(table.storage.properties.get("compression") == Some(compression))
assert(spark.table("t").collect().isEmpty)

sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
assertCompression(maybeFile, fileFormat, compression)
}
}
}
}
}
}