Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorize reads and deserialize to Arrow #9

Closed
rdblue opened this issue Nov 26, 2018 · 17 comments
Closed

Vectorize reads and deserialize to Arrow #9

rdblue opened this issue Nov 26, 2018 · 17 comments

Comments

@rdblue
Copy link
Contributor

rdblue commented Nov 26, 2018

Iceberg does not use vectorized reads to produce data for Spark. For cases where Spark can use its vectorized read path (flat schemas, no evolution) Spark will be faster. Iceberg should solve this problem by adding a vectorized read path that deserializes to Arrow RowBatch. Spark already has support for Arrow data from PySpark.

@rdblue
Copy link
Contributor Author

rdblue commented Dec 8, 2018

There's more context and discussion on the issue in the old Netflix project: Netflix/iceberg#90

@rdblue rdblue closed this as completed Dec 8, 2018
@rdblue rdblue reopened this Dec 8, 2018
@prodeezy
Copy link
Contributor

prodeezy commented May 21, 2019

@rdblue .. I ran some benchmarks on production sized data and we'r seeing a large enough gap in scan performance between Spark's core reader impl (with vectorization) and Iceberg's Parquet Reader. (I can share them but I guess the difference is expected).

I'd be glad to work on this effort. If you have thoughts around this from previous discussions and general approach/challenges please let me know. I understand this is prolly a pretty large undertaking coz it involves other formats as well. Meanwhile, i'm trying to get something working so I can get an understanding of the challenges involved so I can work on a proposal. But i'd appreciate your thoughts around things we should address in a potential solution.

/cc @rominparekh @fbocse @aokolnychyi @mccheah

@aokolnychyi
Copy link
Contributor

aokolnychyi commented May 21, 2019

I would totally support this effort. The benchmarks in this PR also confirm that vectorized execution is important.

I should mention that Iceberg's Parquet reader seems to be significantly more efficient on nested data.

@mccheah
Copy link
Contributor

mccheah commented May 21, 2019

I would certainly review and collaborate on this!

@aokolnychyi Spark disables vectorized read on nested data, which would then indicate that Iceberg without vectorized read is faster than Spark without vectorized read.

@aokolnychyi
Copy link
Contributor

@mccheah you are right that we don't have vectorized reads on nested data in Spark. The benchmarks in the PR above test vectorized and non-vectorized reads on flat and nested data. On flat data, Iceberg is slightly faster. The real difference is seen on nested data.

As I also said in that PR, Iceberg DS is V2 and the file source is still V1, which complicates the comparison. I am working on benchmarks for readers alone without Spark.

@rdblue
Copy link
Contributor Author

rdblue commented May 22, 2019

It would be great to have people working on this! Feel free to pick it up and I'll help review it. I think there are good tests that we can copy that are used to validate the row-based readers and writers.

@julienledem may also be interested. He has given talks on how to efficiently deserialize Parquet to Arrow and can hopefully help answer questions.

@danielcweeks
Copy link
Contributor

For anyone interested, Spark already has a good reference implementation in Scala here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala

One thing to note is that there isn't an implementation for map type. I think Arrow was planning on using list<struct<key,value>, but engines may need something closer to struct<keys:list<>, values:list<>> in order overlay the columnar implementations directly. It might be necessary to provide both.

I'm happy to help anyone who's interested in working on this.

@danielcweeks
Copy link
Contributor

@aokolnychyi I just made a pull request for a schema converter between iceberg and arrow, which is the first step in getting setup for building vectors: #194

@prodeezy
Copy link
Contributor

Thanks @danielcweeks ! I also found https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala that has some nice plumbing for Batch to Iterator[InternalRow] and vice versa. This is used by Spark's Dataset to convert partition/split data into Arrow Batch Rdds.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented May 24, 2019

Another point about the current reader is that we first construct GenericInternalRows and then have an unsafe projection to get UnsafeRows.

Here are benchmark results for readers/writers alone (without Spark):

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersFlatDataBenchmark.readUsingIcebergReader                          ss    5  7.336 ± 0.298   s/op
SparkParquetReadersFlatDataBenchmark.readUsingIcebergReaderUnsafe                    ss    5  8.533 ± 0.237   s/op
SparkParquetReadersFlatDataBenchmark.readUsingSparkReader                            ss    5  9.859 ± 0.335   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingIcebergReader            ss    5  3.964 ± 0.140   s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe      ss    5  5.032 ± 0.061   s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingSparkReader              ss    5  5.076 ± 0.181   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReader                        ss    5  3.925 ± 0.225   s/op
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReaderUnsafe                  ss    5  4.514 ± 0.097   s/op
SparkParquetReadersNestedDataBenchmark.readUsingSparkReader                          ss    5  5.673 ± 0.194   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReader          ss    5  1.914 ± 0.126   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe    ss    5  2.408 ± 0.058   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingSparkReader            ss    5  2.682 ± 0.165   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetWritersFlatDataBenchmark.writeUsingIcebergWriter                         ss    5  4.789 ± 0.168   s/op
SparkParquetWritersFlatDataBenchmark.writeUsingSparkWriter                           ss    5  4.948 ± 0.188   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetWritersNestedDataBenchmark.writeUsingIcebergWriter                       ss    5  2.750 ± 0.184   s/op
SparkParquetWritersNestedDataBenchmark.writeUsingSparkWriter                         ss    5  2.859 ± 0.222   s/op

The code is available in the above PR.

@mccheah
Copy link
Contributor

mccheah commented Jul 8, 2019

Is there a plan for how we're going to tackle this? So far I've only seen concrete work in #194. Is there a plan to build out the entire Arrow integration? Perhaps it's appropriate to break down this issue into multiple sub-issues so that PRs can target the sub-issues - then we can merge everything knowing that each piece will be part of a known complete picture?

@danielcweeks
Copy link
Contributor

@mccheah We have started working on a more complete integration with arrow and we'll be following up incrementally as things progress. @anjalinorwood and @samarthjain are working on this here at Netflix. There are still some things that will need to be proved out, but plan to try to chunk the work into smaller pieces for better review and comment.

@prodeezy
Copy link
Contributor

prodeezy commented Jul 23, 2019

I'v added a WIP branch with a working POC for vectorization for primitive types in Iceberg
https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP

Implementation Notes:

  • Iceberg's Reader adds a SupportsScanColumnarBatch mixin to instruct the DataSourceV2ScanExec to use planBatchPartitions() instead of the usual planInputPartitions(). It returns instances of ColumnarBatch on each iteration.
  • ArrowSchemaUtil contains Iceberg to Arrow type conversion. This was copied from [3] . Added by @danielcweeks . Thanks for that!
  • VectorizedParquetValueReaders contains ParquetValueReaders used for reading/decoding the Parquet rowgroups (aka pagestores as referred to in the code)
  • VectorizedSparkParquetReaders contains the visitor implementations to map Parquet types to appropriate value readers. I implemented the struct visitor so that the root schema can be mapped properly. This has the added benefit of vectorization support for structs, so yay!
  • For the initial version the value readers read an entire row group into a single Arrow FieldVector. this i'd imagine will require tuning for right batch sizing but i'v gone with one batch per rowgroup for now.
  • Arrow Field Vectors are wrapped using ArrowColumnVector which is Spark's ColumnVector implementation backed by Arrow. This is the first contact point between Spark and Arrow interfaces.
  • ArrowColumnVectors are stitched together into a ColumnarBatch by ColumnarBatchReader . This is my replacement for InternalRowReader which maps Structs to Columnar Batches. This allows us to have nested structs where each level of nesting would be a nested columnar batch. Lemme know what you think of this approach.
  • I'v added value readers for all supported primitive types listed in AvroDataTest. There's a corresponding test for vectorized reader under TestSparkParquetVectorizedReader
  • I haven't fixed all the Checkstyle errors so you will have to turn checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(

P.S. There's some unused code under ArrowReader.java Ignore this as it's not used. This was from my previous impl of Vectorization. I'v kept it around to compare performance.

Lemme know what folks think of the approach. I'm getting this working for our scale test benchmark and will report back with numbers. Feel free to run your own benchmarks and share.

@danielcweeks
Copy link
Contributor

A few of us met with @prodeezy to discuss the poc implementation (great work btw) and look at the future work we all hope to be contributing to. I've captured some of the discussion and ideas of where we going and considerations here: https://docs.google.com/document/d/1qVcowrYP6xBoB9C4htwEA0QvbHpdstzieNsX26SMG2k/edit#heading=h.yun6jblu7cfi

Feel free to add comments. I think with a little cleanup we might be close to having an initial implementation that we can start iterating on. @rdblue had proposed creating a branch, which I'm all for so we can work more openly and get more eyes on the implementation.

(@samarthjain @anjalinorwood)

@prodeezy
Copy link
Contributor

Will be using https://github.com/apache/incubator-iceberg/tree/vectorized-read going forward to iterate on this feature.

@prodeezy
Copy link
Contributor

prodeezy commented Aug 1, 2019

Vectorization Perf Meeting notes (Aug 1):

After running benchmarks met with @samarthjain , @anjalinorwood and @rdblue to go over some possible improvements we came up with the following.

Possible low hanging fruit to reap with perf :

  • Remove virtual calls in vector reader, make concrete classes (Anjali)
  • Remove hasNext() calls and currentDefinitionLevels() checks and tighten the VectorReader.read() loop (Anjali)
  • Look into arrow memory allocation, try to preallocate memory at initialization and avoid re-allocations ( Samarth )
  • Slowness seen in time spent in FieldVector.setSafe() calls Improve Decimal / String variable byte reading. Make this fixed length based or using binary reading (Gautam)

Deeper look :

  • Possibly look into removing advance() calls at every stage in ColumnIterator
  • Look into removing triple iterator abstraction and work at lower level with parquet

rdsr added a commit to rdsr/incubator-iceberg that referenced this issue Oct 26, 2019
…n ids to an incoming schema as per the table schema (apache#9)
rdsr added a commit to rdsr/incubator-iceberg that referenced this issue Mar 13, 2020
…n ids to an incoming schema as per the table schema (apache#9)

Disable test broken due to LIHADOOP-49587
@rdblue
Copy link
Contributor Author

rdblue commented Jun 22, 2020

Since #828 was merged, I'm going to close this. Let's track the remaining work in the vectorized read milestone.

@rdblue rdblue closed this as completed Jun 22, 2020
moulimukherjee referenced this issue in moulimukherjee/iceberg Jul 24, 2020
HotSushi pushed a commit to HotSushi/iceberg that referenced this issue Jul 31, 2020
…o an incoming schema as per the table schema (apache#9)

Disable test broken due to LIHADOOP-49587
bkahloon pushed a commit to bkahloon/iceberg that referenced this issue Feb 27, 2021
…o an incoming schema as per the table schema (apache#9)

Disable test broken due to LIHADOOP-49587
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants