-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add support for UNION sql #1029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Some tests may be not passed, I will fix them tomorrow. And maybe I also need to add a test for |
/// # } | ||
/// ``` | ||
fn union(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>>; | ||
fn union( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a new method e.g. union_distinct
would make better sense - having booleans in APIs hurts readability.
alternatively - what about df.union(df2)?.distinct()?
and not exposing a new method?
array_str += &*array_value_to_string(column, 1)?; | ||
vec_array.push(column.clone()); | ||
} | ||
if vec_str.contains(&array_str) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the code is using a Vec
with all rows + contains
this scales badly, namely O(n^2)
instead of O(n)
. Besides that, it probably has extremely high overhead in terms of string formatting and memory usage to keep all batches + rows converted to strings in memory. Formatting to string is also not very robust - we don't guarantee that two different values won't be formatted as the same string value.
I think a cleaner / efficient way to go now would be to reuse the current implementations we have to drive the execution and only change the query planning.
#998 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some inspiration, here is the current SELECT DISTINCT
implementation:
https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/sql/planner.rs#L777
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @Dandandan -- specifically I think you could make a plan that implemented SELECT x from foo UNION select x from bar
by effectively creating the same plan as
SELECT distinct (select x from foo UNION ALL select x from bar)
You can see the plan that gets made by running EXPLAIN
:
explain select distinct x from ( select 1 as x UNION ALL select 1 as x);
+---------------+------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[#x]], aggr=[[]] |
| | Union |
| | Projection: Int64(1) AS x |
| | EmptyRelation |
| | Projection: Int64(1) AS x |
| | EmptyRelation |
| physical_plan | HashAggregateExec: mode=FinalPartitioned, gby=[x@0 as x], aggr=[] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "x", index: 0 }], 16) |
| | HashAggregateExec: mode=Partial, gby=[x@0 as x], aggr=[] |
| | UnionExec |
| | RepartitionExec: partitioning=RoundRobinBatch(16) |
| | ProjectionExec: expr=[1 as x] |
| | EmptyExec: produce_one_row=true |
| | RepartitionExec: partitioning=RoundRobinBatch(16) |
| | ProjectionExec: expr=[1 as x] |
| | EmptyExec: produce_one_row=true |
+---------------+------------------------------------------------------------------------------+```
(so use a `UnionExec` followed by `HashAggregateExec`)
let stream = execute_stream(plan).await?; | ||
common::collect(stream).await | ||
let stream = execute_stream(plan.clone()).await?; | ||
let any_plan = plan.as_any().downcast_ref::<UnionExec>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code to execute the UnionExec (if changed) should be changed there.
I suggest to implement it using the plan we have. If a more efficient implementation could be implemented, I think the best way would be to put that in a new node - i.e. UnionDistinctExec
and
/// Execution metrics | ||
metrics: ExecutionPlanMetricsSet, | ||
/// Union ALL or Union | ||
is_all: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Union all and union distinct are quite different, so if we need to add / change implementations, I think it makes sense to add a new node instead, like UnionDistinctExec
@xudong963 thanks for opening this PR! I have some comments on the current approach / direction, let me know what you think. |
@Dandandan Dan Dan, I agree with your idea #998 (comment). After I finished the current approach, I also feel it badly. Thanks for your comments and guides, I will try the PR with #998 (comment). 💪🏻 |
No problem! Thanks for trying and looking forward to the next iteration 👍 |
I ended my disgusting 24-hour on-call. My one-week holiday is coming, I'll be absorbed in this PR! |
Close the PR, the new is #1068 |
* Attempt at caching Jstrings as GlobalRefs in a HashMap to reduce reallocations. I need to confirm 1) there's actually a performance benefit to this, and 2) these GlobalRefs are being released when I want them to be. * Minor refactor and added more docs. * Undo import reordering to reduce diff. * Docs. * Avoid get() by just cloning the Arc to globalref on insert. * Store jstring cache in ExecutionContext.
Which issue does this PR close?
Closes #998
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?