Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase speed and parallelism of the limit algorithm and implement descending sorting #75

Merged
merged 7 commits into from Nov 15, 2020

Conversation

nils-braun
Copy link
Collaborator

@nils-braun nils-braun commented Nov 6, 2020

This PR introduces four changes:

  • it simplifies the logic for getting a piece of the dask dataframe starting at OFFSET and ending at LIMIT.
  • this allows to not needing to calculate the dask dataframe before, but really only at the actual calculation. The calculated used to be needed for getting the partition boundaries, but they can now also only be calculated "lazily"
  • as there is no need to do a full recalculation, I introduced a quick shortcut: if the first partition is already enough, just return this. This is a very typical use case when you just do a "LIMIT 10".
  • it allows for descending sorting also in the first column

@nils-braun nils-braun changed the title [WIP] Increase speed and parallelism of the limit algorithm Increase speed and parallelism of the limit algorithm and implement descending sorting Nov 9, 2020
Comment on lines 79 to 81
df = dd.from_delayed(
reversed([self.reverse_partition(p) for p in df.to_delayed()])
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to become familiar with the df.partitions iterable.

dd.concat(df.partitions[::-1])

Going between delayed and dataframes introduces some inconvenience in graph-handling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh nice! That looks awesome. Yes, this is definitely much better.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point I think that we'll want to go through and remove all of the to/from_delayed calls. This isn't critical, but it's nice to do from a performance standpoint (happy to go into this in more depth if you like).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice @mrocklin! Really appreciated. I would actually be really interested in your thoughts about this part.

Probably you see it right away, but what I would like to achieve is basically two things

  1. call a function on every partition, while also having the partition index as an input argument
  2. calculate partition_borders only when executing the DAG - not already when creating it (this is what I have done before this PR, and it slows down the process).

So far, the delayed function was all I could come up with, but I definitely think there is a better way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created an issue to fix this, so we can go on with this PR.

# We do a (hopefully) very quick check: if the first partition
# is already enough, we will just ust this
first_partition_length = df.map_partitions(lambda x: len(x)).to_delayed()[0]
first_partition_length = first_partition_length.compute()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(df.partitions[0])

@codecov-io
Copy link

codecov-io commented Nov 15, 2020

Codecov Report

Merging #75 (2bc8920) into main (8abc48c) will not change coverage.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff            @@
##              main       #75   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files           34        34           
  Lines         1383      1383           
  Branches       185       189    +4     
=========================================
  Hits          1383      1383           
Impacted Files Coverage Δ
dask_sql/physical/rel/logical/sort.py 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8abc48c...2bc8920. Read the comment docs.

@nils-braun nils-braun merged commit 02e2dad into main Nov 15, 2020
@nils-braun nils-braun deleted the feature/faster-limiting branch November 15, 2020 21:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants