Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark - Accept an output-spec-id that allows writing to a desired partition spec #7120

Merged
merged 11 commits into from
Apr 13, 2023

Conversation

gustavoatt
Copy link
Contributor

Summary

Allow passing output-spec-id to the Spark writer, so that we can customize which partition spec to write. This is useful for when a table has more than one active spec (e.g. daily and hourly partition spec).

Fixes #6932

Testing

  • Unit tests for Spark 3.1, 3.2 & 3.3

@github-actions github-actions bot added the spark label Mar 15, 2023
@gustavoatt
Copy link
Contributor Author

@rdblue could you take a look when you have a chance? I wrote about the use-case for this feature on #6932.

Let me know what you think.

@edgarRd
Copy link
Contributor

edgarRd commented Mar 15, 2023

Should this be in Spark 2.4 as well?

@gustavoatt
Copy link
Contributor Author

@edgarRd I looked into it, but I'm not sure whether it is worthwhile to add it to Spark 2.4:

  1. It is a much larger change since the SparkWriter significantly changed from Spark 2 to Spark 3.
  2. I hit some issues with org.apache.iceberg.io.PartitionedWriter when trying to write to a non-default partition spec which is only used for Spark 2.4.

For our use-case, adding only on Spark 3 should be enough.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @RussellSpitzer was interested in this as well (for rewrite to different spec id).

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems reasonable to me, left a few minor comments.

Would also ask @aokolnychyi to see if I am missing some potential problems by doing so.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, just a style nit

Copy link
Contributor Author

@gustavoatt gustavoatt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review @szehon-ho!

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the code changes, those look good now. Missed some questions/comments on the test.

@szehon-ho
Copy link
Collaborator

Thanks @gustavoatt for all the changes, let me wait a little bit to see if any other concerns, will merge if not

@@ -127,6 +129,16 @@ class SparkWrite {
this.dsSchema = dsSchema;
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();

if (writeConf.outputSpecId() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have this inside SparkWriteConf and make outputSpecId() return int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I moved this logic to SparkWriteConf. The main reason why I initially did not do it there was because I did not want to store the specs and current spec there, but I think that should be ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could keep a reference to Table, just like we do in SparkReadConf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up PR #7348

@@ -115,6 +115,10 @@ public String wapId() {
return sessionConf.get("spark.wap.id", null);
}

public Integer outputSpecId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially we were not sure whether to make spec IDs public but I also don't see a good alternative.
I am OK with the overall idea of exposing this.

@aokolnychyi
Copy link
Contributor

The change looks good to me, I had a few minor comments.

@gustavoatt
Copy link
Contributor Author

Thanks for the review @aokolnychyi and @szehon-ho.

There is a check failure on this PR but looks unrelated to my changes:

Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0.
* What went wrong:

Execution failed for task ':iceberg-spark:iceberg-spark-3.2_2.12:checkstyleMain'.
You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins.
> A failure occurred while executing org.gradle.api.plugins.quality.internal.CheckstyleAction

   > An unexpected error occurred configuring and executing Checkstyle.
See https://docs.gradle.org/8.0.2/userguide/command_line_interface.html#sec:command_line_warnings
      > java.lang.Error: Error was thrown while processing /home/runner/work/iceberg/iceberg/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
499 actionable tasks: 499 executed

@szehon-ho
Copy link
Collaborator

Yea I think its appeared a few times, tracked by #7321, let's just retrigger (close and re-open for example)

@gustavoatt gustavoatt closed this Apr 13, 2023
@gustavoatt gustavoatt reopened this Apr 13, 2023
@@ -115,6 +121,20 @@ public String wapId() {
return sessionConf.get("spark.wap.id", null);
}

public int outputSpecId() {
final int outputSpecId =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would you mind remove 'final' here? I know from @aokolnychyi (when he reviewed me change), he prefers not to have extra finals except in class fields, as modern compiler usually adds it anyway. ex: #4293 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I usually just keep the final as a way of having something like const to avoid accidentally modifying something I did not intend to. But I think it is not necessary in this case and would prefer to do it to keep consistency in the rpo.

@szehon-ho szehon-ho merged commit 522480d into apache:master Apr 13, 2023
21 checks passed
@szehon-ho
Copy link
Collaborator

szehon-ho commented Apr 13, 2023

Merged, thanks @gustavoatt , and also @aokolnychyi, @edgarRd for additional review

@gustavoatt
Copy link
Contributor Author

Thanks for the review and merging @szehon-ho @aokolnychyi!

Appreciate the effort spent reviewing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Spark SQL writes with a specific partition spec ID
4 participants