New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Iceberg writers for Parquet data written from Spark. #63
Conversation
@aokolnychyi, this PR avoids using Spark's WriteSupport for Parquet. It would be interesting to see which option performs better. My initial testing shows this new write path takes about 48 seconds for 500,000 random records where the Spark WriteSupport takes about 52. |
@rdblue I'll give it a try in a few days |
I have a local benchmark that compares writing 5000000 records in Parquet format using gzip compression via Iceberg and via Spark native file source. Here are the results with this PR:
This PR is also critical as Iceberg will collect proper metadata per file (e.g., min/max values). As for now, we only keep track of the number of rows during writes via Iceberg Spark data source. |
Thanks @aokolnychyi! Looks like it isn't faster, but at least it isn't slower. |
I will share the benchmarking code later and we can discuss if the test is valid or not. Overall, I saw that the performance on flat data is very close but Iceberg is a bit faster on nested data. I have more benchmarks for the read path. There are also interesting moments. |
private final int precision; | ||
private final int scale; | ||
private final int length; | ||
private final ThreadLocal<byte[]> bytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At one point would multiple threads access the same writer? Does this truly need to be a ThreadLocal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if we can achieve something similar by making our own byte buffer pool that's instantiated here. Can make the lifecycle of the reused byte buffers more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These writer classes are used to build a tree that can consume objects that will be written to Parquet files. Those trees are expensive to build because it requires traversing the table schema. That means that we may want to cache and reuse them later, possibly from multiple threads. Because the function they perform is stateless, it would be easy to later assume that these are thread-safe. That's why I went ahead and made them thread-safe.
We could change how this is done, but it is one byte array allocated per thread, so I think this light-weight solution is fine.
spark/src/main/java/com/netflix/iceberg/spark/data/SparkParquetWriters.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/com/netflix/iceberg/spark/data/SparkParquetWriters.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public ParquetValueWriter<?> map(GroupType map, | ||
ParquetValueWriter<?> keyWriter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't indentation for this and many other methods in this class be 4 spaces in from the public
modifier on the previous line? Also might want map
to be on its own line. In general the formatting for method declarations throughout seems off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rule I was using was to indent to the start of arguments, and if any one argument could not fit in the space, to move all arguments back to 4 spaces from the method definition's start. I'm fine with updating this convention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, how many args do we want to keep per line? I've seen a few args per line in other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should standardize this. It would be good to just pick a rule.
We tested this change locally and did not notice any issues so far. It would be nice to see this merged once @mccheah's comments are addressed. |
@rdblue Thanks for this PR. I applied this to latest and ran my functional tests. Works great! confirmed that top level stats are kept in Manifests now and filters can prune files :-) File pruning on predicates is a critical feature for us. Looking forward to seeing this merged. P.S. master branch has diverged slightly so the PR patch didn't apply cleanly. Some slight changes were needed. No biggie but wanted to let you know. |
case INT_8: | ||
case INT_16: | ||
case INT_32: | ||
case INT_64: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize that INT96 is deprecated as per https://issues.apache.org/jira/browse/PARQUET-323 and don't want to encourage people using it in Iceberg but if people do want to re-write data with INT96 rows into iceberg (as INT64) would we run into this code? If so should we not handle it by truncating INT96 to INT64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, by the time this code gets INT96 values, they will be converted to binary, fixed, or a long. That's the engine's job because Iceberg won't be used to read INT96 values from files. So another read implementation produces values, then the engine hands them to Iceberg using a known type. No need to handle it here.
08ee543
to
ee0330d
Compare
@mccheah, @prodeezy, @aokolnychyi, I've rebased and updated this PR for your review comments. Please have another look when you get a chance. Thank you! |
ee0330d
to
61d3195
Compare
Minor nit: Would be nice to update the PR title to also reflect that this change adds top level column metrics to Iceberg. |
|
||
@Override | ||
public ParquetValueWriter<?> message(MessageType message, | ||
List<ParquetValueWriter<?>> fieldWriters) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: fix indentation to standard (+2 spaces) for all methods in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's discussion on this further down. We should decide on a style for long method arguments and add rules to enforce it. I have generally used this style, but I know others have used 4 spaces and one argument per line. Some places where this style can't fit single arguments have used 4 spaces from the method definition and as many arguments per line as possible... so while I would say that this is pretty normal for the codebase, we need to clean it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: Would be nice to update the PR title to also reflect that this change adds top level column metrics to Iceberg. Also pending some style fixes this looks good to me.
@prodeezy: What does this change that affects top-level column metrics? This shouldn't change metrics because they are taken from the footer after the file is written. That doesn't have much to do with how records are deconstructed and written. #136 addresses the missing nested metrics problem, right? |
I think @prodeezy meant that the metrics are not actually fetched from the footer as of today. Correct me if I am wrong, but we still use Anyway, I believe collecting min/max stats is a side-effect of switching to Iceberg writers. The main feature is that we actually switch to Iceberg writers, so it makes sense to keep the PR name as it is. |
I didn't know that was broken! Thanks for pointing it out. I've opened #137 to fix the bug in the |
Thanks for the reviews, everyone! I'm merging this. |
…er (apache#63) * PLATQ-3011 Do not add filter if there are no tombstone literals * Bump version to 1.0-adobe-17.16
This removes the use of Spark's Parquet WriteSupport and replaces it with Parquet writers like those used for Iceberg generics and Avro records.