From 5bd3d29f9fa118719c94d1f5acffa24d6f1a755d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 5 Jul 2014 21:59:01 -0700 Subject: [PATCH 1/4] Fixed Parquet log level --- .../main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index f9046368e7ced..df3d426996414 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -401,7 +401,7 @@ private[parquet] object ParquetTypesConverter extends Logging { } else { val attributes = convertToAttributes( readMetaData(origPath, conf).getFileMetaData.getSchema) - log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes") + log.info(s"Falling back to schema conversion from Parquet types; result: $attributes") attributes } } From 1c0d1b923a57fddd1fe67270c71e28ac0324de04 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 8 Jul 2014 18:53:38 -0700 Subject: [PATCH 2/4] Accelerated Parquet schema retrieving --- .../sql/parquet/ParquetTableSupport.scala | 27 +++++++++-------- .../spark/sql/parquet/ParquetTypes.scala | 30 +++++++++++-------- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 9cd5dc5bbd393..0776dd14fbc61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -17,20 +17,19 @@ package org.apache.spark.sql.parquet -import org.apache.hadoop.conf.Configuration +import java.util.{HashMap => JHashMap} +import org.apache.hadoop.conf.Configuration import parquet.column.ParquetProperties import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.api.ReadSupport.ReadContext import parquet.hadoop.api.{ReadSupport, WriteSupport} import parquet.io.api._ -import parquet.schema.{MessageType, MessageTypeParser} +import parquet.schema.MessageType import org.apache.spark.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution.SparkSqlSerializer -import com.google.common.io.BaseEncoding /** * A `parquet.io.api.RecordMaterializer` for Rows. @@ -93,8 +92,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { configuration: Configuration, keyValueMetaData: java.util.Map[String, String], fileSchema: MessageType): ReadContext = { - var parquetSchema: MessageType = fileSchema - var metadata: java.util.Map[String, String] = new java.util.HashMap[String, String]() + var parquetSchema = fileSchema + val metadata = new JHashMap[String, String]() val requestedAttributes = RowReadSupport.getRequestedSchema(configuration) if (requestedAttributes != null) { @@ -109,7 +108,7 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr) } - return new ReadSupport.ReadContext(parquetSchema, metadata) + new ReadSupport.ReadContext(parquetSchema, metadata) } } @@ -132,13 +131,17 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { private[parquet] var attributes: Seq[Attribute] = null override def init(configuration: Configuration): WriteSupport.WriteContext = { - attributes = if (attributes == null) RowWriteSupport.getSchema(configuration) else attributes - + val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) + val metadata = new JHashMap[String, String]() + metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr) + + if (attributes == null) { + attributes = ParquetTypesConverter.convertFromString(origAttributesStr) + } + log.debug(s"write support initialized for requested schema $attributes") ParquetRelation.enableLogForwarding() - new WriteSupport.WriteContext( - ParquetTypesConverter.convertFromAttributes(attributes), - new java.util.HashMap[java.lang.String, java.lang.String]()) + new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata) } override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index df3d426996414..9396b19bcea00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -22,6 +22,7 @@ import java.io.IOException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter} import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData} @@ -365,20 +366,23 @@ private[parquet] object ParquetTypesConverter extends Logging { s"Expected $path for be a directory with Parquet files/metadata") } ParquetRelation.enableLogForwarding() - val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) - // if this is a new table that was just created we will find only the metadata file - if (fs.exists(metadataPath) && fs.isFile(metadataPath)) { - ParquetFileReader.readFooter(conf, metadataPath) - } else { - // there may be one or more Parquet files in the given directory - val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path)) - // TODO: for now we assume that all footers (if there is more than one) have identical - // metadata; we may want to add a check here at some point - if (footers.size() == 0) { - throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path") - } - footers(0).getParquetMetadata + + val children = fs.listStatus(path).filterNot { + _.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME } + + // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row + // groups. Since Parquet schema is replicated among all row groups, we only need to touch a + // single row group to read schema related metadata. + children + // Try any non-"_metadata" file first... + .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE) + // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is + // empty, thus normally the "_metadata" file is expected to be fairly small). + .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)) + .map(ParquetFileReader.readFooter(conf, _)) + .getOrElse( + throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")) } /** From d2c4417a45dff48ad52a830695f9d68f9ed8531f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 10 Jul 2014 13:17:57 -0700 Subject: [PATCH 3/4] Worked around PARQUET-16 to improve Parquet performance --- .../sql/parquet/ParquetTableOperations.scala | 115 ++++++++++++++---- 1 file changed, 91 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index ade823b51c9cd..ea74320d06c86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -17,27 +17,34 @@ package org.apache.spark.sql.parquet +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.util.Try + import java.io.IOException +import java.lang.{Long => JLong} import java.text.SimpleDateFormat -import java.util.Date +import java.util.{Date, List => JList} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter -import parquet.hadoop.{ParquetRecordReader, ParquetInputFormat, ParquetOutputFormat} -import parquet.hadoop.api.ReadSupport +import parquet.hadoop._ +import parquet.hadoop.api.{InitContext, ReadSupport} +import parquet.hadoop.metadata.GlobalMetaData import parquet.hadoop.util.ContextUtil -import parquet.io.InvalidRecordException +import parquet.io.ParquetDecodingException import parquet.schema.MessageType -import org.apache.spark.{Logging, SerializableWritable, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} +import org.apache.spark.{Logging, SerializableWritable, TaskContext} /** * Parquet table scan operator. Imports the file that backs the given @@ -55,16 +62,14 @@ case class ParquetTableScan( override def execute(): RDD[Row] = { val sc = sqlContext.sparkContext val job = new Job(sc.hadoopConfiguration) - ParquetInputFormat.setReadSupportClass( - job, - classOf[org.apache.spark.sql.parquet.RowReadSupport]) + ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport]) + val conf: Configuration = ContextUtil.getConfiguration(job) - val fileList = FileSystemHelper.listFiles(relation.path, conf) - // add all paths in the directory but skip "hidden" ones such - // as "_SUCCESS" and "_metadata" - for (path <- fileList if !path.getName.startsWith("_")) { - NewFileInputFormat.addInputPath(job, path) + val qualifiedPath = { + val path = new Path(relation.path) + path.getFileSystem(conf).makeQualified(path) } + NewFileInputFormat.addInputPath(job, qualifiedPath) // Store both requested and original schema in `Configuration` conf.set( @@ -87,7 +92,7 @@ case class ParquetTableScan( sc.newAPIHadoopRDD( conf, - classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat], + classOf[FilteringParquetRowInputFormat], classOf[Void], classOf[Row]) .map(_._2) @@ -122,14 +127,7 @@ case class ParquetTableScan( private def validateProjection(projection: Seq[Attribute]): Boolean = { val original: MessageType = relation.parquetSchema val candidate: MessageType = ParquetTypesConverter.convertFromAttributes(projection) - try { - original.checkContains(candidate) - true - } catch { - case e: InvalidRecordException => { - false - } - } + Try(original.checkContains(candidate)).isSuccess } } @@ -302,6 +300,11 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) */ private[parquet] class FilteringParquetRowInputFormat extends parquet.hadoop.ParquetInputFormat[Row] with Logging { + + private var footers: JList[Footer] = _ + + private var fileStatuses= Map.empty[Path, FileStatus] + override def createRecordReader( inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = { @@ -318,6 +321,70 @@ private[parquet] class FilteringParquetRowInputFormat new ParquetRecordReader[Row](readSupport) } } + + override def getFooters(jobContext: JobContext): JList[Footer] = { + if (footers eq null) { + val statuses = listStatus(jobContext) + fileStatuses = statuses.map(file => file.getPath -> file).toMap + footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses) + } + + footers + } + + // TODO Remove this method and related code once PARQUET-16 is fixed + // This method together with the `getFooters` method and the `fileStatuses` field are just used + // to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17 + override def getSplits( + configuration: Configuration, + footers: JList[Footer]): JList[ParquetInputSplit] = { + + val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue) + val minSplitSize: JLong = + Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L)) + if (maxSplitSize < 0 || minSplitSize < 0) { + throw new ParquetDecodingException( + s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" + + s" minSplitSize = $minSplitSize") + } + + val getGlobalMetaData = + classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]]) + getGlobalMetaData.setAccessible(true) + val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData] + + val readContext = getReadSupport(configuration).init( + new InitContext(configuration, + globalMetaData.getKeyValueMetaData(), + globalMetaData.getSchema())) + + val generateSplits = + classOf[ParquetInputFormat[_]].getDeclaredMethods.find(_.getName == "generateSplits").get + generateSplits.setAccessible(true) + + val splits = mutable.ArrayBuffer.empty[ParquetInputSplit] + for (footer <- footers) { + val fs = footer.getFile.getFileSystem(configuration) + val file = footer.getFile + val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file)) + val parquetMetaData = footer.getParquetMetadata + val blocks = parquetMetaData.getBlocks + val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen) + splits.addAll( + generateSplits.invoke( + null, + blocks, + fileBlockLocations, + fileStatus, + parquetMetaData.getFileMetaData, + readContext.getRequestedSchema.toString, + readContext.getReadSupportMetadata, + minSplitSize, + maxSplitSize).asInstanceOf[JList[ParquetInputSplit]]) + } + + splits + } } private[parquet] object FileSystemHelper { From 94a2821a4e2d8ca31387bac79a390388a446fa0a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 16 Jul 2014 11:34:34 +0800 Subject: [PATCH 4/4] Added comments about schema consistency --- .../main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 9396b19bcea00..25ad76addece2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -373,7 +373,8 @@ private[parquet] object ParquetTypesConverter extends Logging { // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row // groups. Since Parquet schema is replicated among all row groups, we only need to touch a - // single row group to read schema related metadata. + // single row group to read schema related metadata. Notice that we are making assumptions that + // all data in a single Parquet file have the same schema, which is normally true. children // Try any non-"_metadata" file first... .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)