Skip to content

Commit

Permalink
[SPARK-14551][SQL] Reduce number of NameNode calls in OrcRelation
Browse files Browse the repository at this point in the history
  • Loading branch information
rbalamohan committed Apr 21, 2016
1 parent 19502a8 commit d6bc52d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 20 deletions.
4 changes: 0 additions & 4 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@
<artifactId>xbean-asm5-shaded</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-exec</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.List;

/**
* This is based on
* This is based on hive-exec-1.2.1
* {@link org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat.OrcRecordReader}.
* This class exposes getObjectInspector which can be used for reducing
* NameNode calls in OrcRelation.
*/
public class OrcRecordReader extends RecordReader<NullWritable, OrcStruct> {
public class SparkOrcNewRecordReader extends
org.apache.hadoop.mapreduce.RecordReader<NullWritable, OrcStruct> {
private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
private final int numColumns;
OrcStruct value;
private float progress = 0.0f;
private ObjectInspector objectInspector;

public OrcRecordReader(Reader file, Configuration conf,
public SparkOrcNewRecordReader(Reader file, Configuration conf,
long offset, long length) throws IOException {
List<OrcProto.Type> types = file.getTypes();
numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{HadoopRDD, RDD}
Expand Down Expand Up @@ -148,20 +147,21 @@ private[sql] class DefaultSource
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)), file.start, file.length, Array.empty
)
// Create RecordReader here. This will help in getting
// Custom OrcRecordReader is used to get
// ObjectInspector during recordReader creation itself and can
// avoid NN call in unwrapOrcStructs to get ObjectInspector for the
// file. Would be helpful for partitioned datasets.
new OrcRecordReader(OrcFile.createReader(new Path(new URI(file
.filePath)), OrcFile.readerOptions(conf)), conf, fileSplit.getStart(),
fileSplit.getLength())
// avoid NameNode call in unwrapOrcStructs per file.
// Specifically would be helpful for partitioned datasets.
val orcReader = OrcFile.createReader(
new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength())
}

// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(conf, requiredSchema,
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
conf,
requiredSchema,
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
new RecordReaderIterator[OrcStruct](orcRecordReader)
)
new RecordReaderIterator[OrcStruct](orcRecordReader))

// Appends partition values
val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
Expand Down Expand Up @@ -325,8 +325,7 @@ private[orc] case class OrcTableScan(

rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val writableIterator = iterator.map(_._2)
val maybeStructOI = OrcFileOperator.getObjectInspector(
split.getPath.toString, Some(conf))
val maybeStructOI = OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf))
OrcRelation.unwrapOrcStructs(
wrappedConf.value,
StructType.fromAttributes(attributes),
Expand Down

0 comments on commit d6bc52d

Please sign in to comment.