Skip to content

Commit

Permalink
Fixes ORC record reader instantiation
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Mar 25, 2016
1 parent 7d628ed commit 2cbb56d
Showing 1 changed file with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive.orc

import java.net.URI
import java.util.Properties

import scala.collection.JavaConverters._
Expand All @@ -29,9 +30,9 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector}
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapred.{JobConf, RecordWriter, Reporter, InputFormat => MapRedInputFormat, OutputFormat => MapRedOutputFormat}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -154,29 +155,23 @@ private[sql] class DefaultSource
maybePhysicalSchema.fold(Iterator.empty: Iterator[InternalRow]) { physicalSchema =>
OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema)

// Creates `OrcRecordReader`s using a dummy `OrcNewInputFormat` containing only a single
// input file.
val orcRecordReaders = {
val orcRecordReader = {
val job = Job.getInstance(conf)
FileInputFormat.setInputPaths(job, file.filePath)

val inputFormat = new OrcNewInputFormat
val splits = inputFormat.getSplits(job)
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)), file.start, file.length, Array.empty
)

splits.asScala.map { split =>
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
inputFormat.createRecordReader(split, hadoopAttemptContext)
}
}

val orcStructIterator = orcRecordReaders.iterator.flatMap {
new RecordReaderIterator[OrcStruct](_)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
inputFormat.createRecordReader(fileSplit, hadoopAttemptContext)
}

// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
file.filePath, conf, dataSchema, orcStructIterator
file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
)

// Appends partition values
Expand Down

0 comments on commit 2cbb56d

Please sign in to comment.