Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17059][SQL] Allow FileFormat to specify partition pruning strategy via splits #15835

Closed
wants to merge 2 commits into from

Conversation

pwoody
Copy link

@pwoody pwoody commented Nov 9, 2016

What changes were proposed in this pull request?

This is a follow up to changes in #14649 to meet the new codebase changes. It is slightly different in that it will not filter files explicitly, but it allows a FileFormat to prune splits (if applicable). This is implemented in ParquetFileFormat and every other format maintains the same behavior.

How was this patch tested?

Passing current tests and added two new tests to validate the pruning and ensure that excessive filtering does not occur on malformed metadata.

Thanks!
Closes #14649

@pwoody pwoody changed the title SPARK-17059: Allow FileFormat to specify partition pruning strategy [SQL][SPARK-17059] Allow FileFormat to specify partition pruning strategy via splits Nov 9, 2016
@pwoody pwoody changed the title [SQL][SPARK-17059] Allow FileFormat to specify partition pruning strategy via splits [SPARK-17059][SQL] Allow FileFormat to specify partition pruning strategy via splits Nov 9, 2016
@pwoody
Copy link
Author

pwoody commented Nov 9, 2016

@andreweduffy @robert3005 @HyukjinKwon

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@pwoody
Copy link
Author

pwoody commented Nov 9, 2016

There is a performance issue here in that we re-fetch the metadata file for each file. My understanding is that FileFormat is meant to be stateless, but if we can add in a cache then that would be very useful.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 10, 2016

Hi @pwoody, So, if I understood this correctly, the original PR only filters out the files to touch ahead but this one proposes also to filter splits via offsets from Parquet's metadata in driver-side, right?

IIRC, each task will already read the footer before it actually starts to read in executors and then will drop the splits (blocks) at Parquet-side. This happens fine in both the Spark's vectorized parquet reader and normal parquet reader. It might be worth reducing the files to touch for the reason I and other guys described in the original PR but I am not sure of pruning splits ahead.

Another potential problem I see here is It seems it does not consider reading bucketed tables whereas the original PR does.

In addition, I guess we really need a benchmark for the proposal to improve the performance when we are in doubt. It is fine if I am wrong and this PR has a benchmark showing the performance improvement with a reasonable explanation.

Lastly, I guess it is a followup (I guess it meant) including the changes proposed in 14649. Maybe, we can wait until that is merged before submitting a followup. I guess it is being reviewed and I think @andreweduffy is still echoing fine.

@HyukjinKwon
Copy link
Member

I think we should cc @liancheng as well who is insightful in this area.

@pwoody
Copy link
Author

pwoody commented Nov 10, 2016

Hey @HyukjinKwon - appreciate the feedback!

Re: file touching - If I add the cache to the _metadata file, then this PR will end up touching at most one file per rootPath driver side (generally just one total).

Re: files v.s. splits - The main difference when pruning splits instead of files is when you have larger files you will end up spawning tasks that immediately will be filtered out executor-side after grabbing the footer. For simplicity, if we have maxSplitBytes == Parquet row group size then a single hit in a file will end up spawning a task for every row group even if the file only has one matching block. This overhead can end up being expensive in a setup w/ dynamicAllocation and multi-tenancy. I generally wish to reduce the total number of tasks.

Re: bucketed - yep sorry, will fix!

Re: benchmarks - yeah totally happy to poke around and make some benchmarks. ParquetReadBenchmark is the appropriate place I suppose?

Re: old PR - the code there is out of date and I don't believe Andrew is actively working on it based on his last comment. This is the follow up to original ER in what I believe is a more comprehensive and reliable way.

Thanks!

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 10, 2016

Ah, I missed the last comment from the old PR. Okay, we can make this shaped nicer. BTW, Spark collects small partitions for each task so I guess this would not introduce a lot of tasks always but yes, I guess it is still a valid point to reduce the number of tasks.

Right, I am fine with this. I thought the original PR was taken over without the courtesy of giving a notification or talking about this ahead. For the benchmark, I have a PR with a benchmark in a PR, 15049 and 14660, which I also referred from other PRs.

I have just few minor notes which are, maybe Closes #14649 can be added at the end of the PR description (if @andreweduffy is not echoing) so that the merge script from committers could close the original one if this one gets merged. Another one is, there are some style guide lines I usually refer which are wiki and databricks/scala-style-guide. I will leave some comments on the changed files.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just left some comments for obvious style nits to sweep out.

fileStatus: FileStatus,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): Seq[FileSplit] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to fix this and the same instances as below

def getSplits(
    fileIndex: FileIndex,
    fileStatus: FileStatus,
...

val parquetSchema = metadata.getFileMetaData.getSchema
val filter = FilterCompat.get(filters
.flatMap(ParquetFilters.createFilter(schema, _))
.reduce(FilterApi.and))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this indentation should be double-spaced.

.reduce(FilterApi.and))
val filteredMetadata =
RowGroupFilter.filterRowGroups(filter, metadataBlocks, parquetSchema).asScala
filteredMetadata.flatMap(bmd => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://github.com/databricks/scala-style-guide#anonymous-methods,

filteredMetadata.flatMap { bmd =>
  ...

seems encouraged.

@pwoody
Copy link
Author

pwoody commented Nov 10, 2016

Ah awesome thanks - my IDE settings must not be properly catching the style - will fix and make sure this doesn't happen in the future.

Also just took a look at the bucketing. I've implemented the same logic, which will prune the file if no splits exist, but it is still all or nothing for a given file because it launches tasks based on bucket. Commit coming soon.

Thanks again.

Copy link

@a10y a10y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this is better than the original file-based pruning strategy in my old PR. I have a few comments which probably are largely due to a lack of understanding on my part, overall things look good.


// Ensure that the metadata has an entry for the file.
// If it does not, do not filter at this stage.
val matchingBlockExists = metadata.getBlocks.asScala.exists(bmd => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little unclear as to why this branch is necessary. Under what conditions do you have a parquet dataset where one of its files isn't in the summary metadata? Is this the case where you write out the dataset with metadata, then add a new partition, and write without updating the metadata?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, writes won't merge parquet metadata unfortunately. So if you use SaveMode.Append on an existing parquet directory it will re-write it.

})
}

private def getMetadataForPath(filePath: Path,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be cacheable, as the lifetime of a FileFormat is tied to a particular HadoopFsRelation, which again is tied to a Dataset. Something may have changed about the API but from a quick look through the relevant classes I don't see anything to refute this. I don't see any written guarantees in the FileFormat API that concrete impls need to be stateless, but neither did I find any reference to caching/state in other implementations. Still I think it should be safe to cache on the ParquetFileFormat for this use case.

}
}

test("Do not filter out parquet file when missing in _metadata file") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what this test accomplishes. If you write out twice with summary metadata, why would anything be missing in _metadata file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment, but yeah - it is when you overwrite the metadata file.

@pwoody
Copy link
Author

pwoody commented Nov 11, 2016

Cool - I've added the caching, fixed style issues, and added pruning to the bucketed reads.

@pwoody
Copy link
Author

pwoody commented Nov 11, 2016

I've pushed up the ability to configure this feature being enabled as well. Here is a benchmark when writing out 200 files with this code:

withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
      withTempPath { path =>
        spark.range(0, numPartitions, 1, numPartitions)
          .write.parquet(path.getCanonicalPath)
        val benchmark = new Benchmark("Parquet partition pruning benchmark", numPartitions)

        benchmark.addCase("Parquet partition pruning enabled") { iter =>
          spark.read.parquet(path.getCanonicalPath).filter("id = 0").collect()
        }

        benchmark.addCase("Parquet partition pruning disabled") { iter =>
          withSQLConf(SQLConf.PARQUET_PARTITION_PRUNING_ENABLED.key -> "false") {
            spark.read.parquet(path.getCanonicalPath).filter("id = 0").collect()
          }
        }

        benchmark.run()
      }
    }
Running benchmark: Parquet partition pruning benchmark
  Running case: Parquet partition pruning enabled
  Stopped after 12 iterations, 2049 ms
  Running case: Parquet partition pruning disabled
  Stopped after 5 iterations, 2177 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_20-b26 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-3635QM CPU @ 2.40GHz

Parquet partition pruning benchmark:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet partition pruning enabled              145 /  171          0.0      723119.3       1.0X
Parquet partition pruning disabled             414 /  436          0.0     2070279.4       0.3X


Process finished with exit code 0

val splitFiles = partitionFiles.flatMap { case (file, values) =>
val blockLocations = getBlockLocations(file)
val filePath = file.getPath.toUri.toString
val format = relation.fileFormat

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: val format seems unused, should this be removed or changed to val format = fsRelation.fileFormat and use format below?

val blockLocations = getBlockLocations(file)
val filePath = file.getPath.toUri.toString
val format = relation.fileFormat
val session = relation.sparkSession

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this be val session = fsRelation.sparkSession?

spark.sparkContext.parallelize(Seq(1, 2, 3), 3)
.toDF("x").write.parquet(path.getCanonicalPath)
spark.sparkContext.parallelize(Seq(4))
.toDF("x").write.mode(SaveMode.Append).parquet(path.getCanonicalPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to turn off ParquetOutputFormat.ENABLE_JOB_SUMMARY to prevent the _metadata file from being updated on the second write, or is it normally not updated in SaveMode.Append ?

I would've expected the second write here to update the _metadata file in which case the test name wouldn't match the behavior. But if the second write doesn't update _metadata then the file has 3 out of 4 file partitions in it, matching the test name.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - i've edited the test to clarify the intended behavior. i've also change how we cache the metadata to reflect it as well.

@pwoody
Copy link
Author

pwoody commented Nov 15, 2016

@rxin @HyukjinKwon ready for more review on my end.

@HyukjinKwon
Copy link
Member

I guess we should ping @liancheng as he was reviewing the previous one.

@pwoody
Copy link
Author

pwoody commented Nov 17, 2016

I've updated the structure of the PR to change caching to be global across instances of FileFormat, have expiry, and reuse known filters. Here is a new benchmark to highlight the filter re-use (I flipped the results to make it easier to read)

    withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
      withTempPath { path =>
        spark.range(0, 200, 1, 200)
          .write.parquet(path.getCanonicalPath)
        val benchmark = new Benchmark("Parquet partition pruning benchmark", 200)
        benchmark.addCase("Parquet partition pruning disabled") { iter =>
          withSQLConf(SQLConf.PARQUET_PARTITION_PRUNING_ENABLED.key -> "false") {
            var df = spark.read.parquet(path.getCanonicalPath).filter("id = 0")
            for (i <- 1 to 10) {
              df = df.filter(s"id < $i")
              df.collect()
            }
          }
        }
        benchmark.addCase("Parquet partition pruning enabled") { iter =>
          var df = spark.read.parquet(path.getCanonicalPath).filter("id = 0")
          for (i <- 1 to 10) {
            df = df.filter(s"id < $i")
            df.collect()
          }
        }
        benchmark.run()
      }
    }
Running benchmark: Parquet partition pruning benchmark
  Running case: Parquet partition pruning disabled
  Stopped after 2 iterations, 8744 ms
  Running case: Parquet partition pruning enabled
  Stopped after 5 iterations, 2187 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_20-b26 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-3635QM CPU @ 2.40GHz

Parquet partition pruning benchmark:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet partition pruning disabled            4332 / 4372          0.0    21659450.2       1.0X
Parquet partition pruning enabled              399 /  438          0.0     1995877.1      10.9X

@rxin
Copy link
Contributor

rxin commented Nov 18, 2016

This creates huge problems when the table is big doesn't it? We just did a big change to get rid of the per table file status cache, because its existence made Spark unstable with dealing with large tables (tables with a lot of files).

@pwoody
Copy link
Author

pwoody commented Nov 18, 2016

Hey - yeah definitely a real concern as it needs driver heap to scale with the size of the metadata of the table you are going to read in.

We could be creative to add heuristics around reading the metadata at all if it is too large or add the ability to spill the existing metadata to disk. Do you have any preferences/thoughts?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants