Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14116][SQL] Implements buildReader() for ORC data source #11936

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext,
(0 until spec.numBuckets).map { bucketId =>
bucketedDataMap.get(bucketId).getOrElse {
t.sqlContext.emptyResult: RDD[InternalRow]
}
bucketedDataMap.getOrElse(bucketId, t.sqlContext.emptyResult: RDD[InternalRow])
})
bucketedRDD
}
Expand Down Expand Up @@ -387,7 +385,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
result.setColumn(resultIdx, input.column(inputIdx))
inputIdx += 1
} else {
require(partitionColumnSchema.fields.filter(_.name.equals(attr.name)).length == 1)
require(partitionColumnSchema.fields.count(_.name == attr.name) == 1)
var partitionIdx = 0
partitionColumnSchema.fields.foreach { f => {
if (f.name.equals(attr.name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class PartitionedFile(
filePath: String,
start: Long,
length: Long) {
override def toString(): String = {
override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}
}
Expand All @@ -44,7 +44,7 @@ case class PartitionedFile(
*
* TODO: This currently does not take locality information about the files into account.
*/
case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition
case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition

class FileScanRDD(
@transient val sqlContext: SQLContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{DataSourceScan, SparkPlan}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
Expand All @@ -56,9 +55,10 @@ import org.apache.spark.sql.types._
*/
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _))
case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _))
if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource]) &&
files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
files.fileFormat.toString == "ORC") &&
files.sqlContext.conf.parquetFileScan =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this conf.

// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
Expand All @@ -81,10 +81,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val bucketColumns =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this variable is never used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea... Didn't remove it since I haven't gone through that part thoroughly.

AttributeSet(
files.bucketSpec
.map(_.bucketColumnNames)
.getOrElse(Nil)
.map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
.getOrElse(sys.error(""))))
.map(_.bucketColumnNames)
.getOrElse(Nil)
.map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
.getOrElse(sys.error(""))))

// Partition keys are not available in the statistics of the files.
val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
Expand All @@ -101,8 +101,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {

val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val prunedDataSchema = readDataColumns.toStructType
logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")

Expand All @@ -120,13 +120,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case Some(bucketing) if files.sqlContext.conf.bucketingEnabled =>
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
val bucketed =
selectedPartitions
.flatMap { p =>
p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen))
}.groupBy { f =>
selectedPartitions.flatMap { p =>
p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen))
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}

(0 until bucketing.numBuckets).map { bucketId =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,11 @@ private[sql] class DefaultSource
// Try to push down filters when filter push-down is enabled.
val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(dataSchema, _))
.reduceOption(FilterApi.and)
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(dataSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.datasources.text

import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -92,7 +91,6 @@ private[orc] object OrcFileOperator extends Logging {
// TODO: Check if the paths coming in are already qualified and simplify.
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDirectory)
.map(_.getPath)
Expand Down
Loading