Skip to content

Commit

Permalink
[SPARK-13664][SQL] Add a strategy for planning partitioned and bucket…
Browse files Browse the repository at this point in the history
…ed scans of files

This PR adds a new strategy, `FileSourceStrategy`, that can be used for planning scans of collections of files that might be partitioned or bucketed.

Compared with the existing planning logic in `DataSourceStrategy` this version has the following desirable properties:
 - It removes the need to have `RDD`, `broadcastedHadoopConf` and other distributed concerns  in the public API of `org.apache.spark.sql.sources.FileFormat`
 - Partition column appending is delegated to the format to avoid an extra copy / devectorization when appending partition columns
 - It minimizes the amount of data that is shipped to each executor (i.e. it does not send the whole list of files to every worker in the form of a hadoop conf)
 - it natively supports bucketing files into partitions, and thus does not require coalescing / creating a `UnionRDD` with the correct partitioning.
 - Small files are automatically coalesced into fewer tasks using an approximate bin-packing algorithm.

Currently only a testing source is planned / tested using this strategy.  In follow-up PRs we will port the existing formats to this API.

A stub for `FileScanRDD` is also added, but most methods remain unimplemented.

Other minor cleanups:
 - partition pruning is pushed into `FileCatalog` so both the new and old code paths can use this logic.  This will also allow future implementations to use indexes or other tricks (i.e. a MySQL metastore)
 - The partitions from the `FileCatalog` now propagate information about file sizes all the way up to the planner so we can intelligently spread files out.
 - `Array` -> `Seq` in some internal APIs to avoid unnecessary `toArray` calls
 - Rename `Partition` to `PartitionDirectory` to differentiate partitions used earlier in pruning from those where we have already enumerated the files and their sizes.

Author: Michael Armbrust <michael@databricks.com>

Closes #11646 from marmbrus/fileStrategy.
  • Loading branch information
marmbrus committed Mar 15, 2016
1 parent 992142b commit 17eec0a
Show file tree
Hide file tree
Showing 22 changed files with 805 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
requiredColumns: Array[String],
filters: Array[Filter],
bucketSet: Option[BitSet],
inputFiles: Array[FileStatus],
inputFiles: Seq[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration],
options: Map[String, String]): RDD[InternalRow] = {
// TODO: This does not handle cases where column pruning has been performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,22 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.analysis._

private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determin if two
* identifiers are equal.
*/
def resolver: Resolver = {
if (caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
import org.apache.spark.sql.types.StructType


abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
Expand Down Expand Up @@ -116,6 +117,23 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {

override lazy val canonicalized: LogicalPlan = EliminateSubqueryAliases(this)

/**
* Resolves a given schema to concrete [[Attribute]] references in this query plan. This function
* should only be called on analyzed plans since it will throw [[AnalysisException]] for
* unresolved [[Attribute]]s.
*/
def resolve(schema: StructType, resolver: Resolver): Seq[Attribute] = {
schema.map { field =>
resolveQuoted(field.name, resolver).map {
case a: AttributeReference => a
case other => sys.error(s"can not handle nested schema yet... plan $this")
}.getOrElse {
throw new AnalysisException(
s"Unable to resolve ${field.name} given [${output.map(_.name).mkString(", ")}]")
}
}
}

/**
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private[sql] case class DataSourceScan(
override val outputPartitioning = {
val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}

class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
val sparkContext: SparkContext = sqlContext.sparkContext
Expand All @@ -29,6 +29,7 @@ class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {

def strategies: Seq[Strategy] =
sqlContext.experimental.extraStrategies ++ (
FileSourceStrategy ::
DataSourceStrategy ::
DDLStrategy ::
SpecialLimits ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ case class DataSource(
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray

val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sqlContext,
Expand Down Expand Up @@ -208,7 +208,20 @@ case class DataSource(
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray

val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths)
// If they gave a schema, then we try and figure out the types of the partition columns
// from that schema.
val partitionSchema = userSpecifiedSchema.map { schema =>
StructType(
partitionColumns.map { c =>
// TODO: Case sensitivity.
schema
.find(_.name.toLowerCase() == c.toLowerCase())
.getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
})
}

val fileCatalog: FileCatalog =
new HDFSFileCatalog(sqlContext, options, globbedPaths, partitionSchema)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sqlContext,
Expand All @@ -220,22 +233,11 @@ case class DataSource(
"It must be specified manually")
}

// If they gave a schema, then we try and figure out the types of the partition columns
// from that schema.
val partitionSchema = userSpecifiedSchema.map { schema =>
StructType(
partitionColumns.map { c =>
// TODO: Case sensitivity.
schema
.find(_.name.toLowerCase() == c.toLowerCase())
.getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
})
}.getOrElse(fileCatalog.partitionSpec(None).partitionColumns)

HadoopFsRelation(
sqlContext,
fileCatalog,
partitionSchema = partitionSchema,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
Expand Down Expand Up @@ -296,7 +298,7 @@ case class DataSource(
resolveRelation()
.asInstanceOf[HadoopFsRelation]
.location
.partitionSpec(None)
.partitionSpec()
.partitionColumns
.fieldNames
.toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val partitionAndNormalColumnFilters =
filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet

val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
val selectedPartitions = t.location.listFiles(partitionFilters)

logInfo {
val total = t.partitionSpec.partitions.length
Expand Down Expand Up @@ -180,7 +180,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))

t.bucketSpec match {
case Some(spec) if t.sqlContext.conf.bucketingEnabled() =>
case Some(spec) if t.sqlContext.conf.bucketingEnabled =>
val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = {
(requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
val bucketed =
Expand All @@ -200,7 +200,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
requiredColumns.map(_.name).toArray,
filters,
None,
bucketFiles.toArray,
bucketFiles,
confBroadcast,
t.options).coalesce(1)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
a.map(_.name).toArray,
f,
None,
t.location.allFiles().toArray,
t.location.allFiles(),
confBroadcast,
t.options)) :: Nil
}
Expand All @@ -255,7 +255,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
filters: Seq[Expression],
buckets: Option[BitSet],
partitionColumns: StructType,
partitions: Array[Partition],
partitions: Seq[Partition],
options: Map[String, String]): SparkPlan = {
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]

Expand All @@ -272,14 +272,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(requiredColumns: Seq[Attribute], filters: Array[Filter]) => {

relation.bucketSpec match {
case Some(spec) if relation.sqlContext.conf.bucketingEnabled() =>
case Some(spec) if relation.sqlContext.conf.bucketingEnabled =>
val requiredDataColumns =
requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))

// Builds RDD[Row]s for each selected partition.
val perPartitionRows: Seq[(Int, RDD[InternalRow])] = partitions.flatMap {
case Partition(partitionValues, dir) =>
val files = relation.location.getStatus(dir)
case Partition(partitionValues, files) =>
val bucketed = files.groupBy { f =>
BucketingUtils
.getBucketId(f.getPath.getName)
Expand Down Expand Up @@ -327,14 +326,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map {
case Partition(partitionValues, dir) =>
case Partition(partitionValues, files) =>
val dataRows = relation.fileFormat.buildInternalScan(
relation.sqlContext,
relation.dataSchema,
requiredDataColumns.map(_.name).toArray,
filters,
buckets,
relation.location.getStatus(dir),
files,
confBroadcast,
options)

Expand Down Expand Up @@ -525,33 +524,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
if (matchedBuckets.cardinality() == 0) None else Some(matchedBuckets)
}

protected def prunePartitions(
predicates: Seq[Expression],
partitionSpec: PartitionSpec): Seq[Partition] = {
val PartitionSpec(partitionColumns, partitions) = partitionSpec
val partitionColumnNames = partitionColumns.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}

if (partitionPruningPredicates.nonEmpty) {
val predicate =
partitionPruningPredicates
.reduceOption(expressions.And)
.getOrElse(Literal(true))

val boundPredicate = InterpretedPredicate.create(predicate.transform {
case a: AttributeReference =>
val index = partitionColumns.indexWhere(a.name == _.name)
BoundReference(index, partitionColumns(index).dataType, nullable = true)
})

partitions.filter { case Partition(values, _) => boundPredicate(values) }
} else {
partitions
}
}

// Based on Public API.
protected def pruneFilterProject(
relation: LogicalRelation,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow

/**
* A single file that should be read, along with partition column values that
* need to be prepended to each row. The reading should start at the first
* valid record found after `offset`.
*/
case class PartitionedFile(
partitionValues: InternalRow,
filePath: String,
start: Long,
length: Long)

/**
* A collection of files that should be read as a single task possibly from multiple partitioned
* directories.
*
* IMPLEMENT ME: This is just a placeholder for a future implementation.
* TODO: This currently does not take locality information about the files into account.
*/
case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition

class FileScanRDD(
@transient val sqlContext: SQLContext,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {


override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
throw new NotImplementedError("Not Implemented Yet")
}

override protected def getPartitions: Array[Partition] = Array.empty
}
Loading

0 comments on commit 17eec0a

Please sign in to comment.