Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic Benchmarks for Iceberg Spark Data Source #105

Merged
merged 3 commits into from
Jun 12, 2019

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Feb 18, 2019

This PR contains benchmarks for the Spark data source in Iceberg. The main goal is to compare its performance to the built-in file source in Spark.

Scenarios

  • Reading flat and nested Parquet data without any filters/projections
  • Reading flat and nested Parquet data with a projection
  • Reading flat and nested Parquet data with a filter
  • Writing flat and nested Parquet data

How to Execute

Each benchmark describes how it can be executed in its Javadoc.

A sample command is illustrated below (ensure the output directory exists):

./gradlew :iceberg-spark:jmh -PjmhIncludeRegex=SparkParquetFlatDataReadBenchmark -PjmhOutputPath=benchmark/parquet-flat-data-read-benchmark-result.txt

Summary

  • Iceberg DS is V2 and the file source is still V1, which complicates the comparison.
  • Adding support for vectorized execution in Iceberg is important. Right now, the file source supports vectorized execution on flat data only.
  • The Parquet reader in Iceberg handles nested data more efficiently.
  • Iceberg can boost the performance of highly selective queries by leveraging file stats.
  • Spark implements nested schema pruning only for the file source, which is odd and should be fixed.
  • Spark does not push down filters on nested columns to data sources. There is an ongoing effort to add this functionality in Spark 3.0.

Overall, it seems Iceberg Parquet reader and writer are more efficient but it is important to add vectorized execution. Also, it will make sense to redo the benchmarks once the file source is migrated to DS V2.

Open Questions

Ways to run

The recommended way to run JMH benchmarks is via Maven or Gradle. However, we can still try integrating into JUnit if it gives us more flexibility.

Materializing Datasets

One way to materialize Datasets in Java:

ds.foreach(record -> {});

This approach computes the underlying rdd as follows:

lazy val rdd: RDD[T] = {
  val objectType = exprEnc.deserializer.dataType
  rddQueryExecution.toRdd.mapPartitions { rows =>
    rows.map(_.get(0, objectType).asInstanceOf[T])
  }
}

The current way looks a bit ugly:

ds.queryExecution().toRdd().toJavaRDD().foreach(record -> {});

However, it should be more efficient. It computes the underlying rdd as follows:

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

Documentation

We need proper documentation on how to run/extend/create benchmarks.

@aokolnychyi
Copy link
Contributor Author

@rdblue @mccheah @danielcweeks Initially, I created these benchmarks to answer a few questions I had. Let's see if they make sense upstream. I found them quite useful.

This implementation is just one potential way of doing this.

@prodeezy
Copy link
Contributor

prodeezy commented Mar 5, 2019

@aokolnychyi these are great and the code is useful for others to use and extend.

Can you add a manual entry for how you built and ran these benchmarks (What cmd line statements were used ) ? Can add them to the spark/README.md. Maybe also instructions on how one can extend this.

@aokolnychyi
Copy link
Contributor Author

@prodeezy yeah, I also agree that it will be useful to describe how to run and extend if we decide to have these benchmarks upstream. For now, it is just in the Java doc per each benchmark.

Later, it would be also important to check that the results are reliable and consistent across different machines.

@danielcweeks
Copy link
Contributor

@aokolnychyi Thanks for doing this, I think there are some really good insights and questions that come out of this. I would characterize some as the following (ordered by least controversial from my perspective):

  • Write path is effectively the same in performance (within error)

  • For the file skipping tests, I'm concerned that the difference in number of files processed per task obviates any conclusions we can reasonably draw from the results. With task combining being different, the largest factor may be the work done in the task. The flat data test demonstrates that as you point out, and the nested data test could be skewing the other direction because all files are processed in one task.

  • For the materialized scans, I feel we would get more accurate results by better isolating the read path (i.e. use the read path directly as opposed to running though a job). There are quite a number of variables above the materialization that could impact the performance especially with such small datasets that I don’t have a lot of confidence in the order of magnitude of the comparative difference and how that would scale with different datasets.

Also, I would prefer that we don’t actually commit the results of the benchmarks as there are just too many variables to have a canonical version. For example, the underlying filesystem (e.g. HDFS, S3) would likely impact the performance in different areas (and scale different across number of partitions/files/etc.). Hardware and local customizations can also have a pronounced impact.

Benchmarks are notoriously controversial, so allowing them to be run and interpreted in the environment they will be used is often the most relevant way to deal with all of these variables.

I would be in favor of adding this (minus the results) along with more instruction and guidance of how to use/configure for no other reason than to have a good starting point for adding other benchmarks and iterating on improving the accuracy.

@aokolnychyi
Copy link
Contributor Author

Thanks for your feedback, @danielcweeks!

I think one of the most important questions is how we want to use such benchmarks.

There might be a few options.

  1. Contributors verify the performance impact of their fixes/improvements by running a particular benchmark with and without their change.
  2. Users run benchmarks to try out Iceberg and see how it compares to the built-in Spark file source.
  3. Committers run these benchmarks before every release to see the performance difference and prevent any degradation.

It would be great to support option 3 but I believe we are far from this. Instead, we can try to focus on a generic framework for option 1/2. Ideally, we should be able to use it locally as well as on existing data sets (e.g., point to a dataset in HDFS).

I agree the file skipping benchmarks are bit controversial. The main idea was to show that Iceberg doesn't touch irrelevant files and boosts the performance for highly selective queries. However, these benchmarks would make more sense on real data. So, we can either remove the file skipping benchmarks completely or just try to make them generic enough so that users can also run them on real datasets.

It definitely makes sense to have a benchmark for Parquet readers alone. That would be a fair comparison. However, I think it is still useful to see the end-to-end performance, which covers a lot of aspects. For example, the read/write path for Spark Data Source V2 can be a bit slower. We need to catch such things and fix them.

Excluding the results makes sense to me.

@danielcweeks @prodeezy @rdblue, it would be really great to hear your opinion on how we see the future of such benchmarks. If we decide to have them, I will update the PR.

@rdblue
Copy link
Contributor

rdblue commented Apr 18, 2019

It sounds to me like the right approach is to remove the results and get this in as it is today. We can work on improvements later and I'd rather have these maintained than sitting in a PR.

@aokolnychyi
Copy link
Contributor Author

Alright, I'll remove the results and update the PR.

@rdblue where do we want to document how to change/configure/extend them?

@danielcweeks
Copy link
Contributor

Agree. We can iterate on how we want to evaluate benchmark performance, but this is a good starting point.

@rdblue
Copy link
Contributor

rdblue commented Apr 18, 2019

I think documentation should go into the ASF site. The source is located here: https://github.com/apache/incubator-iceberg/tree/master/site

@aokolnychyi
Copy link
Contributor Author

@danielcweeks @prodeezy @rdblue I've updated the benchmarking code and removed the results. Please, let me know what you think.

I am not sure we want to publicly document these benchmarks right now. I think it would make more sense once we have separate benchmarks for the Parquet reader/writer. At this point, Javadoc for each benchmark describes how it can be executed.

@aokolnychyi
Copy link
Contributor Author

I've implemented basic benchmarks for readers/writers. This PR is ready for the first review round.

@rdblue
Copy link
Contributor

rdblue commented May 24, 2019

@danielcweeks, can you take a look?

@danielcweeks
Copy link
Contributor

@aokolnychyi This looks good for a first round of benchmarks to start iterating on. If there are others you think we should start tracking (even if we don't implement them now) you might want to open issues for them.

One minor issue, I had to add jvmArgs = '-Xmx5g' to the build configuration so the benchmarks wouldn't run out of memory, but that may be something specific to my environment.

+1 Thanks for the contribution!

@danielcweeks danielcweeks merged commit 177b63a into apache:master Jun 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants