-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15420] [SQL] Add repartition and sort to prepare output data #13206
Conversation
This combines Hive's pre-insertion casts (without renames) that handle partitioning with the pre-insertion casts/renames in core. The combined rule, ResolveOutputColumns, will resolve columns by name or by position. Resolving by position will detect cases where the number of columns is incorrect or where the input columns are a permutation of the output columns and fail. When resolving by name, each output column is located by name in the child plan. This handles cases where a subset of a data frame is written out.
This PR now catches this problem during analysis and has a better error message. This commit updates the test for the new message and exception type.
Adding new argumetns to InsertIntoTable requires changes to several files. Instead of adding a long list of optional args, this adds an options map, like the one passed to DataSource. Future options can be added and used only where they are needed.
This avoids an extra sort in the WriterContainer when data has already been sorted as part of the query plan. This fixes writes for both HadoopFsRelation and MetastoreRelation.
Test build #58915 has finished for PR 13206 at commit
|
This adds an optimizer rule that will add repartition and sort operations to the logical plan. Sort is added when the table has sort or bucketing columns. Repartition is added when writing columnar formats and the option "spark.sql.files.columnar.insertRepartition" is enabled. This also adds a `writersPerPartition(numTasks: Int)` option when writing that controls the number of files in each output table partition. The optimizer rule adds a repartition step that distributes output by partition and a random value in [0, numTasks).
eed85ad
to
a64be8a
Compare
Test build #58925 has finished for PR 13206 at commit
|
may be fixed in #16898 |
@rdblue do you know if Is it true ^ ? |
@HyukjinKwon, that addresses part of what this patch does, but only for writes that go through FileFormatWriter. This patch works for Hive and adds an optimizer rule to add the sort instead of sorting in the writer, which I don't think is a great idea. |
@rdblue What is the latest status of this PR? |
We still maintain a version of this for our Spark builds to avoid an extra sort in Hive. If someone is willing to review it, I can probably find the time to rebase it on master. I think the year this sat initially was just because the 2.0 release was happening at the same time and there wasn't much bandwidth for reviews. |
Test build #83583 has finished for PR 13206 at commit
|
Test build #95019 has finished for PR 13206 at commit
|
Test build #95964 has finished for PR 13206 at commit
|
@rdblue can this be merged? |
@tooptoop4, this will be done in the DataSourceV2 work. I don't think that it is going to be done for v1 plans. |
This is currently based on SPARK-14543 and includes its commits.
What changes were proposed in this pull request?
sortBy
for Hive tables.DataFrameWriter#writersPerPartition(Int)
is set. This enables users to easily control how many files per partition are created.How was this patch tested?
WIP: adding tests.