Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector}
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -109,6 +107,38 @@ private[sql] class DefaultSource
}
}

def mapRequiredColumns(
conf: Configuration,
dataSchema: StructType,
physicalSchema: StructType,
requiredSchema: StructType): StructType = {
/**
* requiredSchema names might not match with physical schema names.
*
* This is especially true when data is generated via Hive wherein
* orc files would have column names as _col0, _col1 etc. This is
* fixed in Hive 2.0, where in physical col names would match that
* of metastore.
*
* To make it backward compatible, it is required to map physical
* names to that of requiredSchema.
*/

// for requiredSchema, get the ordinal from dataSchema
val ids = requiredSchema.map(a => dataSchema.fieldIndex(a.name): Integer).sorted

// for ids, get corresponding name from physicalSchema (e.g _col1 in
// case of hive. otherwise it would match physical name)
val names = ids.map(i => physicalSchema.fieldNames(i))

HiveShim.appendReadColumns(conf, ids, names)

val mappedReqPhysicalSchemaStruct =
StructType(physicalSchema.filter(struct => names.contains(struct.name)))

mappedReqPhysicalSchemaStruct
}

override def buildReader(
sqlContext: SQLContext,
dataSchema: StructType,
Expand Down Expand Up @@ -139,8 +169,9 @@ private[sql] class DefaultSource
Iterator.empty
} else {
val physicalSchema = maybePhysicalSchema.get
OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)

// Get StructType for newly mapped schema
val mappedReqPhysicalSchema =
mapRequiredColumns(conf, dataSchema, physicalSchema, requiredSchema)
val orcRecordReader = {
val job = Job.getInstance(conf)
FileInputFormat.setInputPaths(job, file.filePath)
Expand All @@ -157,8 +188,10 @@ private[sql] class DefaultSource

// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
)
file.filePath,
conf,
mappedReqPhysicalSchema,
new RecordReaderIterator[OrcStruct](orcRecordReader))

// Appends partition values
val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
Expand Down Expand Up @@ -271,67 +304,6 @@ private[orc] class OrcOutputWriter(
}
}

private[orc] case class OrcTableScan(
@transient sqlContext: SQLContext,
attributes: Seq[Attribute],
filters: Array[Filter],
@transient inputPaths: Seq[FileStatus])
extends Logging
with HiveInspectors {

def execute(): RDD[InternalRow] = {
val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
val conf = job.getConfiguration

// Tries to push down filters if ORC filter push-down is enabled
if (sqlContext.conf.orcFilterPushDown) {
OrcFilters.createFilter(filters).foreach { f =>
conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}

// Figure out the actual schema from the ORC source (without partition columns) so that we
// can pick the correct ordinals. Note that this assumes that all files have the same schema.
val orcFormat = new DefaultSource
val dataSchema =
orcFormat
.inferSchema(sqlContext, Map.empty, inputPaths)
.getOrElse(sys.error("Failed to read schema from target ORC files."))
// Sets requested columns
OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes))

if (inputPaths.isEmpty) {
// the input path probably be pruned, return an empty RDD.
return sqlContext.sparkContext.emptyRDD[InternalRow]
}
FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)

val inputFormatClass =
classOf[OrcInputFormat]
.asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]]

val rdd = sqlContext.sparkContext.hadoopRDD(
conf.asInstanceOf[JobConf],
inputFormatClass,
classOf[NullWritable],
classOf[Writable]
).asInstanceOf[HadoopRDD[NullWritable, Writable]]

val wrappedConf = new SerializableConfiguration(conf)

rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val writableIterator = iterator.map(_._2)
OrcRelation.unwrapOrcStructs(
split.getPath.toString,
wrappedConf.value,
StructType.fromAttributes(attributes),
writableIterator
)
}
}
}

private[orc] object OrcTableScan {
// This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
Expand Down Expand Up @@ -389,11 +361,4 @@ private[orc] object OrcRelation extends HiveInspectors {

maybeStructOI.map(unwrap).getOrElse(Iterator.empty)
}

def setRequiredColumns(
conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = {
val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer)
val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip
HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
}
}