Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Mar 11, 2016
1 parent 9990d41 commit 4f29845
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
requiredColumns: Array[String],
filters: Array[Filter],
bucketSet: Option[BitSet],
inputFiles: Array[FileStatus],
inputFiles: Seq[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration],
options: Map[String, String]): RDD[InternalRow] = {
// TODO: This does not handle cases where column pruning has been performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private[sql] object PhysicalRDD {

val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.{FileSourceStrategy, DataSourceStrategy}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}

class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
val sparkContext: SparkContext = sqlContext.sparkContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommand
import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVectorUtils}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -180,7 +180,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))

t.bucketSpec match {
case Some(spec) if t.sqlContext.conf.bucketingEnabled() =>
case Some(spec) if t.sqlContext.conf.bucketingEnabled =>
val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = {
(requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
val bucketed =
Expand Down Expand Up @@ -272,7 +272,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(requiredColumns: Seq[Attribute], filters: Array[Filter]) => {

relation.bucketSpec match {
case Some(spec) if relation.sqlContext.conf.bucketingEnabled() =>
case Some(spec) if relation.sqlContext.conf.bucketingEnabled =>
val requiredDataColumns =
requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow

/**
* A single file that should be read, along with partition column values that
* need to be prepended to each row. The reading should start at the first
* valid record found after `offset`.
*/
case class PartitionedFile(
partitionValues: InternalRow,
filePath: String,
start: Long,
length: Long)

/**
* A collection of files that should be read as a single task possibly from multiple partitioned
* directories.
*
* IMPLEMENT ME: This is just a placeholder for a future implementation.
*/
case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition

class FileScanRDD(
@transient val sqlContext: SQLContext,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {


override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
throw new NotImplementedError("Not Implemented Yet")
}

override protected def getPartitions: Array[Partition] = Array.empty
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path

import org.apache.spark.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{PhysicalRDD, SparkPlan}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
* by user specified columns.
*
* At a high level planning occurs in four phases:
* - Split filters by when they need to be evaluated.
* - Prune the schema of the data requested based on any projections present. Today this pruning
* is only done on top level columns, but formats should support pruning of nested columns as
* well.
* - Construct a reader function by passing filters and the schema into the FileFormat.
* - Using an partition pruning predicates, enumerate the list of files that should be read.
* - Split the files into tasks and construct a FileScanRDD
* - Add any projection or filters that must be evaluated after the scan.
*
* Files are assigned into tasks using the following algorithm:
* - If the table is bucketed: group files by bucket id into the correct number of partitions.
* - If the table is not bucketed or bucketing is turned off:
* - If any file is larger than the threshold, split it into pieces based on that threshold
* - Sort the files by decreasing file size.
* - Assign the ordered files to buckets using the following algorithm. If the current partition
* is under the threshold with the addition of the next file, add it. If not, open a new bucket
* and add it. Proceed to the next file.
*/
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _))
if files.fileFormat.toString == "TestFileFormat" =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
// - bucket keys only - optionally used to prune files to read
// - keys stored in the data only - optionally used to skip groups of data in files
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)

val partitionColumns =
AttributeSet(l.resolve(files.partitionSchema, files.sqlContext.analyzer.resolver))
val partitionKeyFilters =
ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")

val bucketColumns =
AttributeSet(
files.bucketSpec
.map(_.bucketColumnNames)
.getOrElse(Nil)
.map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
.getOrElse(sys.error(""))))

// Partition keys are not available in the statistics of the files.
val dataFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)

// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters
logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")

val selectedPartitions = files.location.listFiles(partitionKeyFilters.toSeq)

val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
val requiredAttributes = AttributeSet(requiredExpressions).map(_.name).toSet

val prunedDataSchema =
StructType(
files.dataSchema.filter(f => requiredAttributes.contains(f.name)))
logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")

val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")

val readFile = files.fileFormat.buildReader(
sqlContext = files.sqlContext,
partitionSchema = files.partitionSchema,
dataSchema = prunedDataSchema,
filters = pushedDownFilters,
options = files.options)

val plannedPartitions = files.bucketSpec match {
case Some(bucketing) if files.sqlContext.conf.bucketingEnabled =>
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
val bucketed =
selectedPartitions
.flatMap { p =>
p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen))
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}

(0 until bucketing.numBuckets).map { bucketId =>
FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
}

case _ =>
val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
assert(file.getLen != 0)
(0L to file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size)
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L

/** Add the given file to the current partition. */
def addFile(file: PartitionedFile): Unit = {
currentSize += file.length
currentFiles.append(file)
}

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
FilePartition(
partitions.size,
currentFiles.toArray.toSeq) // Copy to a new Array.
partitions.append(newPartition)
}
currentFiles.clear()
currentSize = 0
}

// Assign files to partitions using "First Fit Decreasing" (FFD)
// TODO: consider adding a slop factor here?
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
addFile(file)
} else {
addFile(file)
}
}
closePartition()
partitions
}

val scan =
PhysicalRDD(
l.output,
new FileScanRDD(
files.sqlContext,
readFile,
plannedPartitions),
"FileScan",
Map("format" -> files.fileFormat.toString))

val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.Filter(_, scan)).getOrElse(scan)
val withProjections = if (projects.forall(_.isInstanceOf[AttributeReference])) {
withFilter
} else {
execution.Project(projects, withFilter)
}

withProjections :: Nil

case _ => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin

/** ************************ Spark SQL Params/Hints ******************* */

def filesMaxPartitionBytes = getConf(FILES_MAX_PARTITION_BYTES)
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)

def useCompression: Boolean = getConf(COMPRESS_CACHED)

Expand Down Expand Up @@ -612,7 +612,7 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin
def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)

def bucketingEnabled(): Boolean = getConf(SQLConf.BUCKETING_ENABLED)
def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)

// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
Expand Down
Loading

0 comments on commit 4f29845

Please sign in to comment.