Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13664][SQL] Add a strategy for planning partitioned and bucketed scans of files #11646

Closed
wants to merge 10 commits into from

Conversation

marmbrus
Copy link
Contributor

This PR adds a new strategy, FileSourceStrategy, that can be used for planning scans of collections of files that might be partitioned or bucketed.

Compared with the existing planning logic in DataSourceStrategy this version has the following desirable properties:

  • It removes the need to have RDD, broadcastedHadoopConf and other distributed concerns in the public API of org.apache.spark.sql.sources.FileFormat
  • Partition column appending is delegated to the format to avoid an extra copy / devectorization when appending partition columns
  • It minimizes the amount of data that is shipped to each executor (i.e. it does not send the whole list of files to every worker in the form of a hadoop conf)
  • it natively supports bucketing files into partitions, and thus does not require coalescing / creating a UnionRDD with the correct partitioning.
  • Small files are automatically coalesced into fewer tasks using an approximate bin-packing algorithm.

Currently only a testing source is planned / tested using this strategy. In follow-up PRs we will port the existing formats to this API.

A stub for FileScanRDD is also added, but most methods remain unimplemented.

Other minor cleanups:

  • partition pruning is pushed into FileCatalog so both the new and old code paths can use this logic. This will also allow future implementations to use indexes or other tricks (i.e. a MySQL metastore)
  • The partitions from the FileCatalog now propagate information about file sizes all the way up to the planner so we can intelligently spread files out.
  • Array -> Seq in some internal APIs to avoid unnecessary toArray calls
  • Rename Partition to PartitionDirectory to differentiate partitions used earlier in pruning from those where we have already enumerated the files and their sizes.

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@marmbrus
Copy link
Contributor Author

/cc @cloud-fan @nongli @davies

@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #52899 has finished for PR 11646 at commit 4f29845.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitionedFile(
    • case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition
    • class FileScanRDD(
    • case class Partition(values: InternalRow, files: Seq[FileStatus])

@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #52900 has finished for PR 11646 at commit 7ad3119.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def partitionSpec(schema: Option[StructType]): PartitionSpec
def partitionSpec(): PartitionSpec

def listFiles(filters: Seq[Expression]): Seq[Partition]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used to do partition pruning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name looks confusing, it's called listFiles but return a Seq[Partition]. Maybe we should call it listPartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some comments about the filters. I called it listFiles because it is actually enumerating all of the files for each partition internally (unlike prunePartitions which is only selecting the partition directories).

partitionSchema: StructType,
dataSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin @nongli

Should we make this our own iterator (i.e. w/ close() w/o `hasNext())?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this API support row batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same way as before (relying on erasure). Before we make this public we should decide if we want to make all external sources return batches or have two interfaces.

@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #52939 has finished for PR 11646 at commit 65596df.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #52946 has finished for PR 11646 at commit 35be8d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val scan =
PhysicalRDD(
l.output,
new FileScanRDD(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this part (creating FileScanRDD) into DataSourceScan? We could have a new FileScan for this rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, that would be cleaner. We can't do that until we get rid of the old code path though (since there are two different types are RDDs we want to create).

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@davies
Copy link
Contributor

davies commented Mar 14, 2016

LGTM

@davies
Copy link
Contributor

davies commented Mar 14, 2016

Don't forget to create a JIRA for this.

@marmbrus marmbrus changed the title [SPARK-XXXX][SQL] Add a strategy for planning partitioned and bucketed scans of files [SPARK-13664][SQL] Add a strategy for planning partitioned and bucketed scans of files Mar 14, 2016
@SparkQA
Copy link

SparkQA commented Mar 14, 2016

Test build #53123 has finished for PR 11646 at commit 4744b97.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class NativeDDLCommand(val sql: String) extends RunnableCommand
    • case class CreateDatabase(
    • case class CreateFunction(
    • case class AlterTableRename(
    • case class AlterTableSetProperties(
    • case class AlterTableUnsetProperties(
    • case class AlterTableSerDeProperties(
    • case class AlterTableStorageProperties(
    • case class AlterTableNotClustered(
    • case class AlterTableNotSorted(
    • case class AlterTableSkewed(
    • case class AlterTableNotSkewed(
    • case class AlterTableNotStoredAsDirs(
    • case class AlterTableSkewedLocation(
    • case class AlterTableAddPartition(
    • case class AlterTableRenamePartition(
    • case class AlterTableExchangePartition(
    • case class AlterTableDropPartition(
    • case class AlterTableArchivePartition(
    • case class AlterTableUnarchivePartition(
    • case class AlterTableSetFileFormat(
    • case class AlterTableSetLocation(
    • case class AlterTableTouch(
    • case class AlterTableCompact(
    • case class AlterTableMerge(
    • case class AlterTableChangeCol(
    • case class AlterTableAddCol(
    • case class AlterTableReplaceCol(
    • case class In(attribute: String, values: Array[Any]) extends Filter

@marmbrus
Copy link
Contributor Author

Thanks! Merging to master.

@asfgit asfgit closed this in 17eec0a Mar 15, 2016
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _))
if files.fileFormat.toString == "TestFileFormat" =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this match for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently only the test format, but in a follow up PR we'll add Parquet and eventually remove this check and the old code path.

@cloud-fan
Copy link
Contributor

Sorry for the late review, LGTM.

/**
* A single file that should be read, along with partition column values that
* need to be prepended to each row. The reading should start at the first
* valid record found after `offset`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offset -> start

roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
…ed scans of files

This PR adds a new strategy, `FileSourceStrategy`, that can be used for planning scans of collections of files that might be partitioned or bucketed.

Compared with the existing planning logic in `DataSourceStrategy` this version has the following desirable properties:
 - It removes the need to have `RDD`, `broadcastedHadoopConf` and other distributed concerns  in the public API of `org.apache.spark.sql.sources.FileFormat`
 - Partition column appending is delegated to the format to avoid an extra copy / devectorization when appending partition columns
 - It minimizes the amount of data that is shipped to each executor (i.e. it does not send the whole list of files to every worker in the form of a hadoop conf)
 - it natively supports bucketing files into partitions, and thus does not require coalescing / creating a `UnionRDD` with the correct partitioning.
 - Small files are automatically coalesced into fewer tasks using an approximate bin-packing algorithm.

Currently only a testing source is planned / tested using this strategy.  In follow-up PRs we will port the existing formats to this API.

A stub for `FileScanRDD` is also added, but most methods remain unimplemented.

Other minor cleanups:
 - partition pruning is pushed into `FileCatalog` so both the new and old code paths can use this logic.  This will also allow future implementations to use indexes or other tricks (i.e. a MySQL metastore)
 - The partitions from the `FileCatalog` now propagate information about file sizes all the way up to the planner so we can intelligently spread files out.
 - `Array` -> `Seq` in some internal APIs to avoid unnecessary `toArray` calls
 - Rename `Partition` to `PartitionDirectory` to differentiate partitions used earlier in pruning from those where we have already enumerated the files and their sizes.

Author: Michael Armbrust <michael@databricks.com>

Closes apache#11646 from marmbrus/fileStrategy.
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
assert(file.getLen != 0)
(0L to file.getLen by maxSplitBytes).map { offset =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to split file ourselves? If it's a text file, how can we guarantee we don't break lines while splitting the file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We found that at least ParquetRecordReader and LineRecordReader are designed to support splits that are cut at arbitrary positions. But is this a mandatory conversion that applies to all Hadoop record readers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aww... Unfortunately ORC doesn't obey this convention, and it breaks ORC version of buildReader() implemented in PR #11936. Seems that we have to have FileFormat to tell how to generate file splits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example snippet is shown in PR #11936 to illustrate the problem.

One apparent naive "fix" for this is to include at least one whole file in each partition. But this doesn't work well with large input files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, false alarm... See here.

val selectedPartitions = files.location.listFiles(partitionKeyFilters.toSeq)

val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there is no Project or Filter above the relation? We should read all columns but here we treat it as no column is required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think thats how it works. In PhysicalOperation, when there are no projections, we use the output of the child as the list of projections (i.e. all columns).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants