From 057b6f2332d12229940ade4ed56bb39d931a46c8 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 24 Mar 2016 23:36:04 +0800 Subject: [PATCH 1/6] WIP --- .../datasources/DataSourceStrategy.scala | 6 +- .../execution/datasources/FileScanRDD.scala | 4 +- .../datasources/FileSourceStrategy.scala | 29 +++-- .../datasources/parquet/ParquetRelation.scala | 10 +- .../datasources/text/DefaultSource.scala | 1 - .../spark/sql/hive/orc/OrcFileOperator.scala | 2 - .../spark/sql/hive/orc/OrcRelation.scala | 104 ++++++++++++++++-- 7 files changed, 118 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 4943702438d65..52c8f3ef0be73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -208,9 +208,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext, (0 until spec.numBuckets).map { bucketId => - bucketedDataMap.get(bucketId).getOrElse { - t.sqlContext.emptyResult: RDD[InternalRow] - } + bucketedDataMap.getOrElse(bucketId, t.sqlContext.emptyResult: RDD[InternalRow]) }) bucketedRDD } @@ -387,7 +385,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { result.setColumn(resultIdx, input.column(inputIdx)) inputIdx += 1 } else { - require(partitionColumnSchema.fields.filter(_.name.equals(attr.name)).length == 1) + require(partitionColumnSchema.fields.count(_.name == attr.name) == 1) var partitionIdx = 0 partitionColumnSchema.fields.foreach { f => { if (f.name.equals(attr.name)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index bbe7f4abb18a6..988c785dbe618 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -32,7 +32,7 @@ case class PartitionedFile( filePath: String, start: Long, length: Long) { - override def toString(): String = { + override def toString: String = { s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues" } } @@ -44,7 +44,7 @@ case class PartitionedFile( * * TODO: This currently does not take locality information about the files into account. */ -case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition +case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition class FileScanRDD( @transient val sqlContext: SQLContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index de89d5f1fc91f..4b04fec57d3a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{DataSourceScan, SparkPlan} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types._ /** * A strategy for planning scans over collections of files that might be partitioned or bucketed @@ -56,9 +55,10 @@ import org.apache.spark.sql.types._ */ private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _)) + case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) if (files.fileFormat.toString == "TestFileFormat" || - files.fileFormat.isInstanceOf[parquet.DefaultSource]) && + files.fileFormat.isInstanceOf[parquet.DefaultSource] || + files.fileFormat.toString == "ORC") && files.sqlContext.conf.parquetFileScan => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: @@ -81,10 +81,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val bucketColumns = AttributeSet( files.bucketSpec - .map(_.bucketColumnNames) - .getOrElse(Nil) - .map(l.resolveQuoted(_, files.sqlContext.conf.resolver) - .getOrElse(sys.error("")))) + .map(_.bucketColumnNames) + .getOrElse(Nil) + .map(l.resolveQuoted(_, files.sqlContext.conf.resolver) + .getOrElse(sys.error("")))) // Partition keys are not available in the statistics of the files. val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty) @@ -101,8 +101,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val readDataColumns = dataColumns - .filter(requiredAttributes.contains) - .filterNot(partitionColumns.contains) + .filter(requiredAttributes.contains) + .filterNot(partitionColumns.contains) val prunedDataSchema = readDataColumns.toStructType logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}") @@ -120,13 +120,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { case Some(bucketing) if files.sqlContext.conf.bucketingEnabled => logInfo(s"Planning with ${bucketing.numBuckets} buckets") val bucketed = - selectedPartitions - .flatMap { p => - p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen)) - }.groupBy { f => + selectedPartitions.flatMap { p => + p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen)) + }.groupBy { f => BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) } (0 until bucketing.numBuckets).map { bucketId => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 2f2d438f327b8..d6b84be267411 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -321,11 +321,11 @@ private[sql] class DefaultSource // Try to push down filters when filter push-down is enabled. val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(dataSchema, _)) - .reduceOption(FilterApi.and) + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(ParquetFilters.createFilter(dataSchema, _)) + .reduceOption(FilterApi.and) } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 6af403dec5fba..5cfc9e9afad32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.text -import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.mapred.{JobConf, TextInputFormat} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 0195466946020..8248a112a0af4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types.StructType @@ -92,7 +91,6 @@ private[orc] object OrcFileOperator extends Logging { // TODO: Check if the paths coming in are already qualified and simplify. val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) - val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath) .filterNot(_.isDirectory) .map(_.getPath) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index b5dc9106e236a..87c1420c6c686 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.orc import java.util.Properties +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -27,10 +29,10 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} -import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} -import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.hadoop.mapred.{JobConf, RecordWriter, Reporter, InputFormat => MapRedInputFormat, OutputFormat => MapRedOutputFormat} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat - +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.{HadoopRDD, RDD} @@ -44,7 +46,8 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.collection.BitSet -private[sql] class DefaultSource extends FileFormat with DataSourceRegister { +private[sql] class DefaultSource + extends FileFormat with DataSourceRegister with HiveInspectors with Serializable { override def shortName(): String = "orc" @@ -55,7 +58,9 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister { options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { OrcFileOperator.readSchema( - files.map(_.getPath.toUri.toString), Some(sqlContext.sparkContext.hadoopConfiguration)) + files.map(_.getPath.toUri.toString), + Some(sqlContext.sparkContext.hadoopConfiguration) + ) } override def prepareWrite( @@ -80,8 +85,8 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister { job.getConfiguration.set( OrcTableProperties.COMPRESSION.getPropName, OrcRelation - .shortOrcCompressionCodecNames - .getOrElse(codecName, CompressionKind.NONE).name()) + .shortOrcCompressionCodecNames + .getOrElse(codecName, CompressionKind.NONE).name()) } job.getConfiguration match { @@ -117,6 +122,87 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister { val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes OrcTableScan(sqlContext, output, filters, inputFiles).execute() } + + override def buildReader( + sqlContext: SQLContext, + partitionSchema: StructType, + dataSchema: StructType, + filters: Seq[Filter], + options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { + val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + + if (sqlContext.conf.orcFilterPushDown) { + // Sets pushed predicates + OrcFilters.createFilter(filters.toArray).foreach { f => + orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) + orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) + } + } + + val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(orcConf)) + // Temporary variables used to avoid serialization issue + val _dataSchema = dataSchema + + (file: PartitionedFile) => { + val conf = broadcastedConf.value.value + + // Sets required columns + // TODO De-duplicates this part and `OrcFileScan.addColumnIds` + val physicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)).getOrElse { + sys.error("Failed to read schema from target ORC files.") + } + val ids = _dataSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) + val (sortedIDs, sortedNames) = ids.zip(_dataSchema.fieldNames).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + + val recordReaders = { + val job = Job.getInstance(conf) + FileInputFormat.setInputPaths(job, file.filePath) + + val inputFormat = new OrcNewInputFormat + val splits = inputFormat.getSplits(job) + + splits.asScala.map { split => + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + inputFormat.createRecordReader(split, hadoopAttemptContext) + } + } + + val orcStructIterator = recordReaders.iterator.flatMap { + new RecordReaderIterator[OrcStruct](_) + } + + // TODO De-duplicates this part and `OrcFileScan.fillObject` + val deserializer = new OrcSerde + val maybeStructOI = OrcFileOperator.getObjectInspector(file.filePath, Some(conf)) + val mutableRow = new SpecificMutableRow(_dataSchema.map(_.dataType)) + val unsafeProjection = UnsafeProjection.create(_dataSchema) + + maybeStructOI.map { oi => + val (fieldRefs, fieldOrdinals) = _dataSchema.zipWithIndex.map { + case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal + }.unzip + + val unwrappers = fieldRefs.map(unwrapperFor) + + orcStructIterator.map { value => + val raw = deserializer.deserialize(value) + var i = 0 + while (i < fieldRefs.length) { + val fieldValue = oi.getStructFieldData(raw, fieldRefs(i)) + if (fieldValue == null) { + mutableRow.setNullAt(fieldOrdinals(i)) + } else { + unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i)) + } + i += 1 + } + unsafeProjection(mutableRow) + } + }.getOrElse(Iterator.empty) + } + } } private[orc] class OrcOutputWriter( @@ -291,8 +377,8 @@ private[orc] case class OrcTableScan( val orcFormat = new DefaultSource val dataSchema = orcFormat - .inferSchema(sqlContext, Map.empty, inputPaths) - .getOrElse(sys.error("Failed to read schema from target ORC files.")) + .inferSchema(sqlContext, Map.empty, inputPaths) + .getOrElse(sys.error("Failed to read schema from target ORC files.")) // Sets requested columns addColumnIds(dataSchema, attributes, conf) From b1d630b08c013dd6a0fd709b418bed5b6d965377 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 25 Mar 2016 19:12:21 +0800 Subject: [PATCH 2/6] Fixes read path of partitioned ORC table and removes duplicated code --- .../spark/sql/hive/orc/OrcRelation.scala | 197 ++++++++---------- 1 file changed, 91 insertions(+), 106 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 87c1420c6c686..114b7206b7d30 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -26,19 +26,21 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc._ import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties -import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector} import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{JobConf, RecordWriter, Reporter, InputFormat => MapRedInputFormat, OutputFormat => MapRedOutputFormat} +import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.hadoop.mapreduce._ + import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim} import org.apache.spark.sql.sources.{Filter, _} @@ -47,7 +49,7 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.collection.BitSet private[sql] class DefaultSource - extends FileFormat with DataSourceRegister with HiveInspectors with Serializable { + extends FileFormat with DataSourceRegister with Serializable { override def shortName(): String = "orc" @@ -140,67 +142,52 @@ private[sql] class DefaultSource } val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(orcConf)) - // Temporary variables used to avoid serialization issue - val _dataSchema = dataSchema (file: PartitionedFile) => { val conf = broadcastedConf.value.value - // Sets required columns - // TODO De-duplicates this part and `OrcFileScan.addColumnIds` - val physicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)).getOrElse { - sys.error("Failed to read schema from target ORC files.") - } - val ids = _dataSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) - val (sortedIDs, sortedNames) = ids.zip(_dataSchema.fieldNames).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this + // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty + // iterator. + val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) - val recordReaders = { - val job = Job.getInstance(conf) - FileInputFormat.setInputPaths(job, file.filePath) + maybePhysicalSchema.fold(Iterator.empty: Iterator[InternalRow]) { physicalSchema => + OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema) - val inputFormat = new OrcNewInputFormat - val splits = inputFormat.getSplits(job) + // Creates `OrcRecordReader`s using a dummy `OrcNewInputFormat` containing only a single + // input file. + val orcRecordReaders = { + val job = Job.getInstance(conf) + FileInputFormat.setInputPaths(job, file.filePath) - splits.asScala.map { split => - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - inputFormat.createRecordReader(split, hadoopAttemptContext) + val inputFormat = new OrcNewInputFormat + val splits = inputFormat.getSplits(job) + + splits.asScala.map { split => + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + inputFormat.createRecordReader(split, hadoopAttemptContext) + } } - } - val orcStructIterator = recordReaders.iterator.flatMap { - new RecordReaderIterator[OrcStruct](_) - } + val orcStructIterator = orcRecordReaders.iterator.flatMap { + new RecordReaderIterator[OrcStruct](_) + } - // TODO De-duplicates this part and `OrcFileScan.fillObject` - val deserializer = new OrcSerde - val maybeStructOI = OrcFileOperator.getObjectInspector(file.filePath, Some(conf)) - val mutableRow = new SpecificMutableRow(_dataSchema.map(_.dataType)) - val unsafeProjection = UnsafeProjection.create(_dataSchema) - - maybeStructOI.map { oi => - val (fieldRefs, fieldOrdinals) = _dataSchema.zipWithIndex.map { - case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal - }.unzip - - val unwrappers = fieldRefs.map(unwrapperFor) - - orcStructIterator.map { value => - val raw = deserializer.deserialize(value) - var i = 0 - while (i < fieldRefs.length) { - val fieldValue = oi.getStructFieldData(raw, fieldRefs(i)) - if (fieldValue == null) { - mutableRow.setNullAt(fieldOrdinals(i)) - } else { - unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i)) - } - i += 1 - } - unsafeProjection(mutableRow) + // Unwraps `OrcStruct`s to `UnsafeRow`s + val unsafeRowIterator = OrcRelation.unwrapOrcStructs( + file.filePath, conf, dataSchema, orcStructIterator + ) + + // Appends partition values + val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes + val joinedRow = new JoinedRow() + val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) + + unsafeRowIterator.map { dataRow => + appendPartitionColumns(joinedRow(dataRow, file.partitionValues)) } - }.getOrElse(Iterator.empty) + } } } } @@ -311,55 +298,6 @@ private[orc] case class OrcTableScan( extends Logging with HiveInspectors { - private def addColumnIds( - dataSchema: StructType, - output: Seq[Attribute], - conf: Configuration): Unit = { - val ids = output.map(a => dataSchema.fieldIndex(a.name): Integer) - val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIds, sortedNames) - } - - // Transform all given raw `Writable`s into `InternalRow`s. - private def fillObject( - path: String, - conf: Configuration, - iterator: Iterator[Writable], - nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = { - val deserializer = new OrcSerde - val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf)) - val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) - val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs)) - - // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero - // rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty - // partition since we know that this file is empty. - maybeStructOI.map { soi => - val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map { - case (attr, ordinal) => - soi.getStructFieldRef(attr.name) -> ordinal - }.unzip - val unwrappers = fieldRefs.map(unwrapperFor) - // Map each tuple to a row object - iterator.map { value => - val raw = deserializer.deserialize(value) - var i = 0 - while (i < fieldRefs.length) { - val fieldValue = soi.getStructFieldData(raw, fieldRefs(i)) - if (fieldValue == null) { - mutableRow.setNullAt(fieldOrdinals(i)) - } else { - unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i)) - } - i += 1 - } - unsafeProjection(mutableRow) - } - }.getOrElse { - Iterator.empty - } - } - def execute(): RDD[InternalRow] = { val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) val conf = job.getConfiguration @@ -380,7 +318,7 @@ private[orc] case class OrcTableScan( .inferSchema(sqlContext, Map.empty, inputPaths) .getOrElse(sys.error("Failed to read schema from target ORC files.")) // Sets requested columns - addColumnIds(dataSchema, attributes, conf) + OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes)) if (inputPaths.isEmpty) { // the input path probably be pruned, return an empty RDD. @@ -403,7 +341,12 @@ private[orc] case class OrcTableScan( rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) => val writableIterator = iterator.map(_._2) - fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes) + OrcRelation.unwrapOrcStructs( + split.getPath.toString, + wrappedConf.value, + StructType.fromAttributes(attributes), + writableIterator + ) } } } @@ -413,7 +356,7 @@ private[orc] object OrcTableScan { private[orc] val SARG_PUSHDOWN = "sarg.pushdown" } -private[orc] object OrcRelation { +private[orc] object OrcRelation extends HiveInspectors { // The ORC compression short names val shortOrcCompressionCodecNames = Map( "none" -> CompressionKind.NONE, @@ -429,5 +372,47 @@ private[orc] object OrcRelation { CompressionKind.ZLIB.name -> ".zlib", CompressionKind.LZO.name -> ".lzo" ) -} + def unwrapOrcStructs( + filePath: String, + conf: Configuration, + dataSchema: StructType, + iterator: Iterator[Writable]): Iterator[InternalRow] = { + val deserializer = new OrcSerde + val maybeStructOI = OrcFileOperator.getObjectInspector(filePath, Some(conf)) + val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType)) + val unsafeProjection = UnsafeProjection.create(dataSchema) + + def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = { + val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map { + case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal + }.unzip + + val unwrappers = fieldRefs.map(unwrapperFor) + + iterator.map { value => + val raw = deserializer.deserialize(value) + var i = 0 + while (i < fieldRefs.length) { + val fieldValue = oi.getStructFieldData(raw, fieldRefs(i)) + if (fieldValue == null) { + mutableRow.setNullAt(fieldOrdinals(i)) + } else { + unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i)) + } + i += 1 + } + unsafeProjection(mutableRow) + } + } + + maybeStructOI.map(unwrap).getOrElse(Iterator.empty) + } + + def setRequiredColumns( + conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { + val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) + val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + } +} From 7d628ed09cda5e081218df92c06b956a9e8064a4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 25 Mar 2016 20:22:02 +0800 Subject: [PATCH 3/6] Fixes Scala style --- .../scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 114b7206b7d30..ca1e7d1151907 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector} import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} -import org.apache.hadoop.mapred.{JobConf, RecordWriter, Reporter, InputFormat => MapRedInputFormat, OutputFormat => MapRedOutputFormat} +import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -410,7 +410,7 @@ private[orc] object OrcRelation extends HiveInspectors { } def setRequiredColumns( - conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { + conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) From 2cbb56dc2171870be6d2a4c3e47f301f56d2cf82 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 25 Mar 2016 23:21:12 +0800 Subject: [PATCH 4/6] Fixes ORC record reader instantiation --- .../spark/sql/hive/orc/OrcRelation.scala | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index ca1e7d1151907..c47bd30a734f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.orc +import java.net.URI import java.util.Properties import scala.collection.JavaConverters._ @@ -29,9 +30,9 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector} import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} -import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} +import org.apache.hadoop.mapred.{JobConf, RecordWriter, Reporter, InputFormat => MapRedInputFormat, OutputFormat => MapRedOutputFormat} import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.broadcast.Broadcast @@ -154,29 +155,23 @@ private[sql] class DefaultSource maybePhysicalSchema.fold(Iterator.empty: Iterator[InternalRow]) { physicalSchema => OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema) - // Creates `OrcRecordReader`s using a dummy `OrcNewInputFormat` containing only a single - // input file. - val orcRecordReaders = { + val orcRecordReader = { val job = Job.getInstance(conf) FileInputFormat.setInputPaths(job, file.filePath) val inputFormat = new OrcNewInputFormat - val splits = inputFormat.getSplits(job) + val fileSplit = new FileSplit( + new Path(new URI(file.filePath)), file.start, file.length, Array.empty + ) - splits.asScala.map { split => - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - inputFormat.createRecordReader(split, hadoopAttemptContext) - } - } - - val orcStructIterator = orcRecordReaders.iterator.flatMap { - new RecordReaderIterator[OrcStruct](_) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + inputFormat.createRecordReader(fileSplit, hadoopAttemptContext) } // Unwraps `OrcStruct`s to `UnsafeRow`s val unsafeRowIterator = OrcRelation.unwrapOrcStructs( - file.filePath, conf, dataSchema, orcStructIterator + file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) ) // Appends partition values From a26973ef134e1d7e09763add7e49f1acd0cac685 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 25 Mar 2016 23:25:32 +0800 Subject: [PATCH 5/6] Fixes Scala style, again --- .../scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index c47bd30a734f5..070787ecd3ae9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive.orc import java.net.URI import java.util.Properties -import scala.collection.JavaConverters._ - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -30,7 +28,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector} import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} -import org.apache.hadoop.mapred.{JobConf, RecordWriter, Reporter, InputFormat => MapRedInputFormat, OutputFormat => MapRedOutputFormat} +import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl From 25d894f02148ef2bf0018c7fb83ea798f3ad6e4a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 26 Mar 2016 14:32:25 +0800 Subject: [PATCH 6/6] Addresses PR comments --- .../scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 070787ecd3ae9..7c4a0a0c0f09f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -149,8 +149,10 @@ private[sql] class DefaultSource // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty // iterator. val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) - - maybePhysicalSchema.fold(Iterator.empty: Iterator[InternalRow]) { physicalSchema => + if (maybePhysicalSchema.isEmpty) { + Iterator.empty + } else { + val physicalSchema = maybePhysicalSchema.get OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema) val orcRecordReader = {