Skip to content

Commit

Permalink
[SPARK-11412][SQL] Support merge schema for ORC
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Currently, ORC's `inferSchema` is implemented as randomly choosing one ORC file and reading its schema.

This PR follows the behavior of Parquet, it implements merge schemas logic by reading all ORC files in parallel through a spark job.

Users can enable merge schema by `spark.read.orc("xxx").option("mergeSchema", "true")` or by setting `spark.sql.orc.mergeSchema` to `true`, the prior one has higher priority.

## How was this patch tested?
tested by UT OrcUtilsSuite.scala

Closes #24043 from WangGuangxin/SPARK-11412.

Lead-authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com>
Co-authored-by: wangguangxin.cn <wangguangxin.cn@bytedance.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
2 people authored and gatorsmile committed Jun 30, 2019
1 parent facf9c3 commit 73183b3
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 83 deletions.
Expand Up @@ -566,6 +566,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ORC_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.orc.mergeSchema")
.doc("When true, the Orc data source merges schemas collected from all data files, " +
"otherwise the schema is picked from a random data file.")
.booleanConf
.createWithDefault(false)

val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
.doc("When true, check all the partition paths under the table\'s root directory " +
"when reading data stored in HDFS. This configuration will be deprecated in the future " +
Expand Down Expand Up @@ -1956,6 +1962,8 @@ class SQLConf extends Serializable with Logging {

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def isOrcSchemaMergingEnabled: Boolean = getConf(ORC_SCHEMA_MERGING_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
Expand Down
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

object SchemaMergeUtils extends Logging {
/**
* Figures out a merged Parquet/ORC schema with a distributed Spark job.
*/
def mergeSchemasInParallel(
sparkSession: SparkSession,
files: Seq[FileStatus],
schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType])
: Option[StructType] = {
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())

// !! HACK ALERT !!
// Here is a hack for Parquet, but it can be used by Orc as well.
//
// Parquet requires `FileStatus`es to read footers.
// Here we try to send cached `FileStatus`es to executor side to avoid fetching them again.
// However, `FileStatus` is not `Serializable`
// but only `Writable`. What makes it worse, for some reason, `FileStatus` doesn't play well
// with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These
// facts virtually prevents us to serialize `FileStatus`es.
//
// Since Parquet only relies on path and length information of those `FileStatus`es to read
// footers, here we just extract them (which can be easily serialized), send them to executor
// side, and resemble fake `FileStatus`es there.
val partialFileStatusInfo = files.map(f => (f.getPath.toString, f.getLen))

// Set the number of partitions to prevent following schema reads from generating many tasks
// in case of a small number of orc files.
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
sparkSession.sparkContext.defaultParallelism)

val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles

// Issues a Spark job to read Parquet/ORC schema in parallel.
val partiallyMergedSchemas =
sparkSession
.sparkContext
.parallelize(partialFileStatusInfo, numParallelism)
.mapPartitions { iterator =>
// Resembles fake `FileStatus`es with serialized path and length information.
val fakeFileStatuses = iterator.map { case (path, length) =>
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
}.toSeq

val schemas = schemaReader(fakeFileStatuses, serializedConf.value, ignoreCorruptFiles)

if (schemas.isEmpty) {
Iterator.empty
} else {
var mergedSchema = schemas.head
schemas.tail.foreach { schema =>
try {
mergedSchema = mergedSchema.merge(schema)
} catch { case cause: SparkException =>
throw new SparkException(
s"Failed merging schema:\n${schema.treeString}", cause)
}
}
Iterator.single(mergedSchema)
}
}.collect()

if (partiallyMergedSchemas.isEmpty) {
None
} else {
var finalSchema = partiallyMergedSchemas.head
partiallyMergedSchemas.tail.foreach { schema =>
try {
finalSchema = finalSchema.merge(schema)
} catch { case cause: SparkException =>
throw new SparkException(
s"Failed merging schema:\n${schema.treeString}", cause)
}
}
Some(finalSchema)
}
}
}
Expand Up @@ -94,7 +94,7 @@ class OrcFileFormat
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
OrcUtils.readSchema(sparkSession, files)
OrcUtils.inferSchema(sparkSession, files, options)
}

override def prepareWrite(
Expand Down
Expand Up @@ -57,9 +57,20 @@ class OrcOptions(
}
shortOrcCompressionCodecNames(codecName)
}

/**
* Whether it merges schemas or not. When the given Orc files have different schemas,
* the schemas can be merged. By default use the value specified in SQLConf.
*/
val mergeSchema: Boolean = parameters
.get(MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sqlConf.isOrcSchemaMergingEnabled)
}

object OrcOptions {
val MERGE_SCHEMA = "mergeSchema"

// The ORC compression short names
private val shortOrcCompressionCodecNames = Map(
"none" -> "NONE",
Expand Down
Expand Up @@ -33,7 +33,9 @@ import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

object OrcUtils extends Logging {

Expand Down Expand Up @@ -82,14 +84,36 @@ object OrcUtils extends Logging {
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConf()
// TODO: We need to support merge schema. Please see SPARK-11412.
files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
}
}

/**
* Reads ORC file schemas in multi-threaded manner, using native version of ORC.
* This is visible for testing.
*/
def readOrcSchemasInParallel(
files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean): Seq[StructType] = {
ThreadUtils.parmap(files, "readingOrcSchemas", 8) { currentFile =>
OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles)
.map(s => CatalystSqlParser.parseDataType(s.toString).asInstanceOf[StructType])
}.flatten
}

def inferSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
: Option[StructType] = {
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
if (orcOptions.mergeSchema) {
SchemaMergeUtils.mergeSchemasInParallel(
sparkSession, files, OrcUtils.readOrcSchemasInParallel)
} else {
OrcUtils.readSchema(sparkSession, files)
}
}

/**
* Returns the requested column ids from the given ORC file. Column id can be -1, which means the
* requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty.
Expand Down
Expand Up @@ -476,79 +476,18 @@ object ParquetFileFormat extends Logging {
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())

// !! HACK ALERT !!
//
// Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es
// to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable`
// but only `Writable`. What makes it worse, for some reason, `FileStatus` doesn't play well
// with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These
// facts virtually prevents us to serialize `FileStatus`es.
//
// Since Parquet only relies on path and length information of those `FileStatus`es to read
// footers, here we just extract them (which can be easily serialized), send them to executor
// side, and resemble fake `FileStatus`es there.
val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))

// Set the number of partitions to prevent following schema reads from generating many tasks
// in case of a small number of parquet files.
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
sparkSession.sparkContext.defaultParallelism)

val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles

// Issues a Spark job to read Parquet schema in parallel.
val partiallyMergedSchemas =
sparkSession
.sparkContext
.parallelize(partialFileStatusInfo, numParallelism)
.mapPartitions { iterator =>
// Resembles fake `FileStatus`es with serialized path and length information.
val fakeFileStatuses = iterator.map { case (path, length) =>
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
}.toSeq

// Reads footers in multi-threaded manner within each task
val footers =
ParquetFileFormat.readParquetFootersInParallel(
serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)

// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
if (footers.isEmpty) {
Iterator.empty
} else {
var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter)
footers.tail.foreach { footer =>
val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter)
try {
mergedSchema = mergedSchema.merge(schema)
} catch { case cause: SparkException =>
throw new SparkException(
s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)
}
}
Iterator.single(mergedSchema)
}
}.collect()

if (partiallyMergedSchemas.isEmpty) {
None
} else {
var finalSchema = partiallyMergedSchemas.head
partiallyMergedSchemas.tail.foreach { schema =>
try {
finalSchema = finalSchema.merge(schema)
} catch { case cause: SparkException =>
throw new SparkException(
s"Failed merging schema:\n${schema.treeString}", cause)
}
}
Some(finalSchema)
val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)

readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
}

SchemaMergeUtils.mergeSchemasInParallel(sparkSession, filesToTouch, reader)
}

/**
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.v2.orc

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
Expand All @@ -39,7 +41,7 @@ case class OrcTable(
new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)

override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
OrcUtils.readSchema(sparkSession, files)
OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
new OrcWriteBuilder(options, paths, formatName, supportsDataType)
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources

import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -32,6 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
*
* -> OrcReadSchemaSuite
* -> VectorizedOrcReadSchemaSuite
* -> MergedOrcReadSchemaSuite
*
* -> ParquetReadSchemaSuite
* -> VectorizedParquetReadSchemaSuite
Expand Down Expand Up @@ -134,6 +136,25 @@ class VectorizedOrcReadSchemaSuite
}
}

class MergedOrcReadSchemaSuite
extends ReadSchemaSuite
with AddColumnIntoTheMiddleTest
with HideColumnInTheMiddleTest
with AddNestedColumnTest
with HideNestedColumnTest
with ChangePositionTest
with BooleanTypeTest
with IntegralTypeTest
with ToDoubleTypeTest {

override val format: String = "orc"

override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key, "true")
}

class ParquetReadSchemaSuite
extends ReadSchemaSuite
with AddColumnIntoTheMiddleTest
Expand Down

0 comments on commit 73183b3

Please sign in to comment.