Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17059][SQL] Allow FileFormat to specify partition pruning strategy #14649

Closed
wants to merge 5 commits into from

Conversation

a10y
Copy link

@a10y a10y commented Aug 15, 2016

What changes were proposed in this pull request?

Different FileFormat implementations may be able to make intelligent decisions about files that will need to be processed as part of a FileSourceScanExec based on the pushedFilters available. This PR implements a way to do that pluggably, with an implementation for Parquet that allows applying the summary metadata to prevent task creation.

This has a few performance benefits:

  1. Reading of files is generally slow, especially for S3. In the current Parquet implementation the summary metadata is not used and so the footers are read directly. This can be very slow for large Parquet datasets, as even as of Hadoop 2.7 S3 reads will read the entire file by default (random reads are configurable only starting on 2.7 onwards, and is disabled by default)
  2. Partitions that are found to contain no files can then be pruned out or coalesced depending on the FileFormat's implementation, allowing for fewer tasks being created.

How was this patch tested?

Existing tests and Spark Shell, plus unit test for the Parquet implementation.

@SparkQA
Copy link

SparkQA commented Aug 15, 2016

Test build #3223 has finished for PR 14649 at commit 1a00d1f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -703,6 +703,16 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
}

test("SPARK-17059: Allow Allow FileFormat to specify partition pruning strategy") {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo will fix

@HyukjinKwon
Copy link
Member

Also, if my understanding is correct, we are picking up only single file to read footer (see ParquetFileFormat.scala#L217-L225) unless we merge schemas. So, it seems, due to this reason, writing _metadata or _common_metadata is disabled (See https://issues.apache.org/jira/browse/SPARK-15719).

@a10y
Copy link
Author

a10y commented Aug 16, 2016

Let me see if I can answer these in order:

So, if my understanding is correct, Parquet filters rowgroups in both normal reader and vectorized reader already (#13701). Is this doing the same thing in Spark-side?

Yep, this attempts to do Footer processing before launching tasks, if the _metadata file is available.

Also, doesn't this try to touch many files in driver-side?

This should only be touching the _metadata file, it reads all the Footers that are collected in there to determine file paths to prune out. It needs the _metadata and not the _common_metadata because _common_metadata doesn't include footer info.

Also, if my understanding is correct, we are picking up only single file to read footer (see ParquetFileFormat.scala#L217-L225) unless we merge schemas. So, it seems, due to this reason, writing _metadata or _common_metadata is disabled (See https://issues.apache.org/jira/browse/SPARK-15719).

Yes we have generation of the summary-metadata disabled by default, but it is configurable via parquet.enable.summary-metadata conf parameter. If the _metadata file isn't found it simply falls back to the original partitioning structure and does no file pruning.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Aug 16, 2016

Oh, sorry for the second question. I missed val parquetSchema = metadata.getFileMetaData.getSchema. and yes, it only reads metadata and I think I got your idea for using _metadata here.

So, this one reads _metadata first and then filter some files before actually running the tasks to reduce files to be actually touched. I see.

@a10y
Copy link
Author

a10y commented Aug 16, 2016

Yep, that's it. Effectively it just reduces task overhead for querying large parquet datasets and turns filter queries where the filter never applies into a no-op, no tasks generated (only if summary metadata is available)

@a10y
Copy link
Author

a10y commented Aug 16, 2016

Can we rerun tests please? Issue should be fixed now.

@srowen
Copy link
Member

srowen commented Aug 16, 2016

Jenkins add to whitelist

@srowen
Copy link
Member

srowen commented Aug 16, 2016

Jenkins test this please

@SparkQA
Copy link

SparkQA commented Aug 16, 2016

Test build #63848 has finished for PR 14649 at commit 91a3067.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@a10y
Copy link
Author

a10y commented Aug 16, 2016

cc @rxin @liancheng Let me know what you think

@a10y
Copy link
Author

a10y commented Aug 16, 2016

(Thanks Sean for whitelisting!)

@SparkQA
Copy link

SparkQA commented Aug 19, 2016

Test build #64086 has finished for PR 14649 at commit 7a95989.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@a10y
Copy link
Author

a10y commented Aug 22, 2016

cc @davies @cloud-fan as well

@@ -220,6 +220,21 @@ trait FileFormat {
}

/**
* Allow FileFormat's to have a pluggable way to utilize pushed filters to eliminate partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: FileFormats

@ash211
Copy link
Contributor

ash211 commented Aug 30, 2016

This looks very helpful @andreweduffy ! I see you have logging for how effective the partition filtering is. Do you have any rough benchmarks of particular workflows that were improved by this PR?

@HyukjinKwon
Copy link
Member

(As I am already here), I also think this should be helpful, in particular, for S3 with Parquet. However, I am wondering if this might be only Parquet-specific optimization. I mean, we don't have metafile for other file based data sources.

So, my personal opinion is, to put this within Parquet without adding another interface. If we can implement this for other data sources in the future, I think we can add this interface in the future but not now.

@a10y
Copy link
Author

a10y commented Aug 30, 2016

@ash211 Anecdotally, it's been tested in production and was able to take queries that were requiring full dataset scan of 100+ partitions down to 1-2 partitions (depending on how large the split sizes were). This sped up the query on the order of several seconds, imaginably the savings would be larger for much larger datasets. The dataset we've tested on was also stored in a local network HDFS cluster, not in S3, so seeing as read cost is much higher in that environment would also expect savings to be greater.

@HyukjinKwon I can see the benefit of making this Parquet-specific, however to have the full benefit of this change the logic needs to execute inside FileSourceScanExec so that we avoid creating tasks and reading files in the first place. This is especially important as pre 2.8 versions of hadoop-client s3a implementation will read entire files from the seek point to the end of the file, so pruning files out entirely is helpful.

We could do special casing logic directly in FileSourceScanExec, ex. if (fileFormat.isInstanceOf[ParquetFileFormat]) {...} but I'd imagine no one would be interested in doing something that gross

@a10y
Copy link
Author

a10y commented Aug 30, 2016

addressed typos

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64647 has finished for PR 14649 at commit 163af01.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor

Sorry for the late reply.

Firstly, Spark SQL only reads footers of all Parquet files in case of schema merging, which can be controlled by SQL option spark.sql.parquet.mergeSchema. Because you have to figure out schemas of every individual physical Parquet files to determine the global schema. When schema merging is disabled, which is the default case, summary files (_metadata and/or _common_metadata) are still used if there're any. If no summary files are available, Spark SQL just reads the footer of a random Parquet file and gets the schema. So it seems that the first point mentioned in you PR description is not really a problem?

Secondly, although you mentioned "partition pruning", but what the code change in this PR performs is actually Parquet row group filtering, which is already a feature of Spark SQL.

Thirdly, partition pruning is already implemented in Spark SQL. Furthermore, since partition pruning is handled inside the framework of Spark SQL, not only data source filters, but also arbitrary Catalyst expressions can be used to prune partitions.

That said, I don't see benefits from this PR. Did I miss something here?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Sep 27, 2016

Hi @liancheng, regardless of the PR description and the implementation, I would like to describe what I thought and I hope this is sensible in a way.

I thought in the exactly same way as you described. However, if you look at the codes closely, it actually only takes care of the case when there is _metadata which IIUC includes block offsets and paths. So, this PR tries to drop the files to touch by this information for each partition, which we would anyway touch the files and then will drop whole row-groups for a normal case.

So, this is rather about reducing the files to touch within each partition (as far as I know, we introduced the optimization that collects small files and puts together into each partition).

I might be wrong but this was the reason I thought it should be helpful anyway, in particular, for S3 where touching files itself is expensive.

@a10y
Copy link
Author

a10y commented Sep 27, 2016

Hyun mostly sums it up. This uses the summary metadata for Parquet when available. Rather than performing row group level filtering, it actually filters out entire files when summary metadata is available. It does this when it's constructing the FileScanRDD, which means it actually only spawns tasks for files that match the predicate. At work we were running into issues with S3 deployments where very large S3 datasets would take exceedingly long to load in Spark. Empirically, we're running this exact patch in production and for many types of queries, we see a very large decrease in tasks created and time spent fetching from S3. So this is mainly for the use case of short-lived RDDs (so doing .persist doesn't help you) that are backed by data in S3 (so eliminating read time is actually a significant speed up)

@liancheng
Copy link
Contributor

liancheng commented Sep 27, 2016

@HyukjinKwon @andreweduffy Thanks for the explanations! This makes much more sense to me now.

Although _metadata can be neat for the read path, it's a trouble maker for the write path:

  1. Writing summary files (either _metadata or _common_metadata) can be quite expensive when writing a large Parquet dataset since it reads footers from all files and tries to merge them. This can be especially frustrating when appending a small amount of data to an existing large dataset. This is the reason why we decided to disable writing summary files by default in Spark.
  2. Parquet doesn't always write the summary files even if you explicitly set parquet.enable.summary-metadata to true. For example, when two files have different values of a single key in the user-defined key/value metadata section, Parquet simply gives up writing the summary files and delete existing ones. This may be quite common in the case of schema evolution. What makes it worse, outdated _common_metadata might not be deleted properly due to PARQUET-359, which makes the summary files out of sync.

However, I still agree that with an existing trustworthy _metadata file at hand, this patch is still very useful. I'll take a deeper look at this tomorrow.

@a10y
Copy link
Author

a10y commented Sep 27, 2016

Glad that helped, sorry if it wasn't more clear. Agreed that writing summary metadata isn't always the best. In this patch, it only ever performs the file pruning if the _metadata file exists for the dataset. At work we have it enabled since we have a query-heavy workload where new data lands occasionally. 

Sent from Outlook

On Tue, Sep 27, 2016 at 10:13 AM -0700, "Cheng Lian" notifications@github.com wrote:

@andreweduffy @andreweduffy Thanks for the explanations! This makes much more sense to me now.

Although _metadata can be neat for the read path, it's a trouble maker for the write path:

Writing summary files (either _metadata or _common_metadata) can be quite expensive when writing a large Parquet dataset since it reads footers from all files and tries to merge them. This can be especially frustrating when appending a small amount of data to an existing large dataset.
Parquet doesn't always write the summary files even if you explicitly set parquet.enable.summary-metadata to true. For example, when two files have different values of a single key in the user-defined key/value metadata section, Parquet simply gives up writing the summary files and delete existing ones. This may be quite common in the case of schema evolution. What makes it worse, outdated _common_metadata might not be deleted properly due to PARQUET-359, which makes the summary files out of sync.

However, I still agree that with an existing trustworthy _metadata file at hand, this patch is still very useful. I'll take a deeper look at this tomorrow.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.

@paulata
Copy link

paulata commented Oct 30, 2016

Hi @andreweduffy, this looks very interesting, @guyGerson and I have been working on this for a while - we agree it can significantly reduce the number of tasks and files/objects touched, and agree on its importance for querying object storage systems like S3 and Openstack Swift. We also like your idea of introducing a new interface to handle all formats, not just parquet, although the benefit seems to be currently limited to parquet data with _metadata summaries.

By making a small extension to your pull request, the same idea could work for parquet data without _metadata summaries as well as for all file formats supported by Spark SQL.
@HyukjinKwon @liancheng Please take a look and let us know what you think - we think this justifies adding a new interface and not limiting this feature to Parquet only.

We have found this to be very useful in real world use cases, for example:

Madrid Transportation Scenario: we presented this at Spark Summit Europe 2015 - this use case utilizes Elastic Search to index object storage metadata and search for objects relevant to a query. For example a query looking for rush hour traffic over the past five years. See https://spark-summit.org/eu-2015/events/how-spark-enables-the-internet-of-things-efficient-integration-of-multiple-spark-components-for-smart-city-use-cases/ (slides 9-10).

IoT data archive scenario: we have an external DB with the following information about sensor state: |sensorID -> type, manufacturer, version, isActive |
We want to compare behaviour of all active sensors with their behaviour as recorded in an object storage archive from 2014 using a single query.
Archives/dt=01-01-2014
Archives/dt=01-01-2014/sensor1.json (500MB)
Archives/dt=01-01-2014/sensor2.json (500MB)
Archives/dt=01-01-2014/sensor3.json (500MB)
Archives/dt=02-01-2014
Archives/dt=02-01-2014/sensor1.json (500MB)
Archives/dt=02-01-2014/sensor2.json (500MB)
Archives/dt=02-01-2014/sensor3.json (500MB)
more...

Note that for each case we plug in different code which does the filtering. The filtering is application specific and not necessarily related to the file format (parquet/csv/json etc.).

Here is a code snippet from our patch which shows how to keep the filter interface more generic. @andreweduffy please let us know what you think about extending your pull request in this way.

fileSourceInterfaces.scala:

//interface for applications to implement their own custom file filter. This will be run during query execution
trait CustomFileFilter {
  def isRequired(dataFilters: Seq[Filter], f: FileStatus) : Boolean
}

DataSourceScanExec.scala:

//code snippet example for including the custom filter in the execution process (line:159)
val customFileFilterClazzName = hadoopConf.get("spark.sql.customFileFilter") 

    val filteredPartitions = if (customFileFilterClazzName == null) {
      logInfo(s"No custom file filter detected")
      selectedPartitions
    } else {
      logInfo(s"Custom file filter detected")
      val fileFilter = hadoopConf.getInstances(customFileFilterClazzName, classOf[CustomFileFilter])
        .get(0) 
      val tmpFilteredPartitions = selectedPartitions.map { part => Partition(part.values, part.files.filter { f =>
        fileFilter.isRequired(dataFilters, f)
        })
      }.filter(_.files.nonEmpty)

  tmpFilteredPartitions
    }

@a10y
Copy link
Author

a10y commented Oct 31, 2016

Hey @paulata, sounds like you want to make the pruner pluggable at runtime. That sounds like it would be good, though IMO is a bit orthogonal to this change. This is more of an optimization than an FR, i.e. you're "no worse off" with this change performance-wise and all pre-existing behavior is preserved, whereas yours is a new feature.

You're welcome to reuse the code in this PR for your own needs, or to base your changes on top of this. I have my doubts that the PR in its current form will be merged into Spark, but it's already been merged into Palantir's fork (palantir#40) so hopefully the folks there will try and upstream it at some point in the future.

@HyukjinKwon
Copy link
Member

@andreweduffy, are you fine with #15835? If so, we might be able to close this.

@a10y
Copy link
Author

a10y commented Nov 10, 2016

Yep yep, closing this one

@a10y a10y closed this Nov 10, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants