Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13664][SQL] Add a strategy for planning partitioned and bucketed scans of files #11646

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
requiredColumns: Array[String],
filters: Array[Filter],
bucketSet: Option[BitSet],
inputFiles: Array[FileStatus],
inputFiles: Seq[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration],
options: Map[String, String]): RDD[InternalRow] = {
// TODO: This does not handle cases where column pruning has been performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.analysis._

private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean

def resolver: Resolver = {
if (caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
import org.apache.spark.sql.types.StructType


abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
Expand Down Expand Up @@ -116,6 +117,23 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {

override lazy val canonicalized: LogicalPlan = EliminateSubqueryAliases(this)

/**
* Resolves a given schema to concrete [[Attribute]] references in this query plan. This function
* should only be called on analyzed plans since it will throw [[AnalysisException]] for
* unresolved [[Attribute]]s.
*/
def resolve(schema: StructType, resolver: Resolver): Seq[Attribute] = {
schema.map { field =>
resolveQuoted(field.name, resolver).map {
case a: AttributeReference => a
case other => sys.error(s"can not handle nested schema yet... plan $this")
}.getOrElse {
throw new AnalysisException(
s"Unable to resolve ${field.name} given [${output.map(_.name).mkString(", ")}]")
}
}
}

/**
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private[sql] object PhysicalRDD {

val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}

class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
val sparkContext: SparkContext = sqlContext.sparkContext
Expand All @@ -29,6 +29,7 @@ class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {

def strategies: Seq[Strategy] =
sqlContext.experimental.extraStrategies ++ (
FileSourceStrategy ::
DataSourceStrategy ::
DDLStrategy ::
SpecialLimits ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ case class DataSource(
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray

val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sqlContext,
Expand Down Expand Up @@ -208,7 +208,20 @@ case class DataSource(
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray

val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
// If they gave a schema, then we try and figure out the types of the partition columns
// from that schema.
val partitionSchema = userSpecifiedSchema.map { schema =>
StructType(
partitionColumns.map { c =>
// TODO: Case sensitivity.
schema
.find(_.name.toLowerCase() == c.toLowerCase())
.getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
})
}

val fileCatalog: FileCatalog =
new HDFSFileCatalog(sqlContext, options, globbedPaths, partitionSchema)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sqlContext,
Expand All @@ -220,22 +233,11 @@ case class DataSource(
"It must be specified manually")
}

// If they gave a schema, then we try and figure out the types of the partition columns
// from that schema.
val partitionSchema = userSpecifiedSchema.map { schema =>
StructType(
partitionColumns.map { c =>
// TODO: Case sensitivity.
schema
.find(_.name.toLowerCase() == c.toLowerCase())
.getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
})
}.getOrElse(fileCatalog.partitionSpec(None).partitionColumns)

HadoopFsRelation(
sqlContext,
fileCatalog,
partitionSchema = partitionSchema,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
Expand Down Expand Up @@ -296,7 +298,7 @@ case class DataSource(
resolveRelation()
.asInstanceOf[HadoopFsRelation]
.location
.partitionSpec(None)
.partitionSpec()
.partitionColumns
.fieldNames
.toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val partitionAndNormalColumnFilters =
filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet

val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
val selectedPartitions = t.location.listFiles(partitionFilters)

logInfo {
val total = t.partitionSpec.partitions.length
Expand Down Expand Up @@ -180,7 +180,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))

t.bucketSpec match {
case Some(spec) if t.sqlContext.conf.bucketingEnabled() =>
case Some(spec) if t.sqlContext.conf.bucketingEnabled =>
val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = {
(requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
val bucketed =
Expand All @@ -200,7 +200,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
requiredColumns.map(_.name).toArray,
filters,
None,
bucketFiles.toArray,
bucketFiles,
confBroadcast,
t.options).coalesce(1)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
a.map(_.name).toArray,
f,
None,
t.location.allFiles().toArray,
t.location.allFiles(),
confBroadcast,
t.options)) :: Nil
}
Expand All @@ -255,7 +255,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
filters: Seq[Expression],
buckets: Option[BitSet],
partitionColumns: StructType,
partitions: Array[Partition],
partitions: Seq[Partition],
options: Map[String, String]): SparkPlan = {
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]

Expand All @@ -272,14 +272,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(requiredColumns: Seq[Attribute], filters: Array[Filter]) => {

relation.bucketSpec match {
case Some(spec) if relation.sqlContext.conf.bucketingEnabled() =>
case Some(spec) if relation.sqlContext.conf.bucketingEnabled =>
val requiredDataColumns =
requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))

// Builds RDD[Row]s for each selected partition.
val perPartitionRows: Seq[(Int, RDD[InternalRow])] = partitions.flatMap {
case Partition(partitionValues, dir) =>
val files = relation.location.getStatus(dir)
case Partition(partitionValues, files) =>
val bucketed = files.groupBy { f =>
BucketingUtils
.getBucketId(f.getPath.getName)
Expand Down Expand Up @@ -327,14 +326,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map {
case Partition(partitionValues, dir) =>
case Partition(partitionValues, files) =>
val dataRows = relation.fileFormat.buildInternalScan(
relation.sqlContext,
relation.dataSchema,
requiredDataColumns.map(_.name).toArray,
filters,
buckets,
relation.location.getStatus(dir),
files,
confBroadcast,
options)

Expand Down Expand Up @@ -525,33 +524,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
if (matchedBuckets.cardinality() == 0) None else Some(matchedBuckets)
}

protected def prunePartitions(
predicates: Seq[Expression],
partitionSpec: PartitionSpec): Seq[Partition] = {
val PartitionSpec(partitionColumns, partitions) = partitionSpec
val partitionColumnNames = partitionColumns.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}

if (partitionPruningPredicates.nonEmpty) {
val predicate =
partitionPruningPredicates
.reduceOption(expressions.And)
.getOrElse(Literal(true))

val boundPredicate = InterpretedPredicate.create(predicate.transform {
case a: AttributeReference =>
val index = partitionColumns.indexWhere(a.name == _.name)
BoundReference(index, partitionColumns(index).dataType, nullable = true)
})

partitions.filter { case Partition(values, _) => boundPredicate(values) }
} else {
partitions
}
}

// Based on Public API.
protected def pruneFilterProject(
relation: LogicalRelation,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow

/**
* A single file that should be read, along with partition column values that
* need to be prepended to each row. The reading should start at the first
* valid record found after `offset`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offset -> start

*/
case class PartitionedFile(
partitionValues: InternalRow,
filePath: String,
start: Long,
length: Long)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this has something about locations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. We should add locality information here, but it probably belongs in FilePartition since that is the actual unit of work distribution.

I'll add a comment that this strategy does not yet take locality into account.


/**
* A collection of files that should be read as a single task possibly from multiple partitioned
* directories.
*
* IMPLEMENT ME: This is just a placeholder for a future implementation.
*/
case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition

class FileScanRDD(
@transient val sqlContext: SQLContext,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {


override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
throw new NotImplementedError("Not Implemented Yet")
}

override protected def getPartitions: Array[Partition] = Array.empty
}
Loading