Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase speed and parallelism of the limit algorithm and implement descending sorting #75

Merged
merged 7 commits into from
Nov 15, 2020
74 changes: 39 additions & 35 deletions dask_sql/physical/rel/logical/sort.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Dict, List
from typing import List

import dask
import dask.dataframe as dd
from dask.highlevelgraph import HighLevelGraph
from dask.dataframe.core import new_dd_object
import pandas as pd
import dask.array as da

Expand Down Expand Up @@ -62,11 +61,6 @@ def _apply_sort(
first_sort_column = sort_columns[0]
first_sort_ascending = sort_ascending[0]

# Sort the first column with set_index. Currently, we can only handle ascending sort
if not first_sort_ascending:
raise NotImplementedError(
"The first column needs to be sorted ascending (yet)"
)
# We can only sort if there are no NaNs or infs.
# Therefore we need to do a single pass over the dataframe
# to warn the user
Expand All @@ -79,6 +73,12 @@ def _apply_sort(
raise ValueError("Can not sort a column with NaNs")

df = df.set_index(first_sort_column, drop=False).reset_index(drop=True)
if not first_sort_ascending:
# As set_index().reset_index() always sorts ascending, we need to reverse
# the order inside all partitions and the order of the partitions itself
df = dd.from_delayed(
reversed([self.reverse_partition(p) for p in df.to_delayed()])
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to become familiar with the df.partitions iterable.

dd.concat(df.partitions[::-1])

Going between delayed and dataframes introduces some inconvenience in graph-handling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh nice! That looks awesome. Yes, this is definitely much better.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point I think that we'll want to go through and remove all of the to/from_delayed calls. This isn't critical, but it's nice to do from a performance standpoint (happy to go into this in more depth if you like).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice @mrocklin! Really appreciated. I would actually be really interested in your thoughts about this part.

Probably you see it right away, but what I would like to achieve is basically two things

  1. call a function on every partition, while also having the partition index as an input argument
  2. calculate partition_borders only when executing the DAG - not already when creating it (this is what I have done before this PR, and it slows down the process).

So far, the delayed function was all I could come up with, but I definitely think there is a better way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created an issue to fix this, so we can go on with this PR.


# sort the remaining columns if given
if len(sort_columns) > 1:
Expand All @@ -94,8 +94,7 @@ def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame
Limit the dataframe to the window [offset, end].
That is unfortunately, not so simple as we do not know how many
items we have in each partition. We have therefore no other way than to
calculate (!!!) the sizes of each partition
(this means we need to compute the dataframe already here).
calculate (!!!) the sizes of each partition.

After that, we can create a new dataframe from the old
dataframe by calculating for each partition if and how much
Expand All @@ -104,24 +103,32 @@ def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame
we need to pass the partition number to the selection
function, which is not possible with normal "map_partitions".
"""
# As we need to calculate the partition size, we better persist
# the df. I think...
# TODO: check if this is the best thing to do
df = df.persist()
if not offset:
# We do a (hopefully) very quick check: if the first partition
# is already enough, we will just ust this
first_partition_length = df.map_partitions(lambda x: len(x)).to_delayed()[0]
first_partition_length = first_partition_length.compute()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(df.partitions[0])

if first_partition_length >= end:
return df.head(end, compute=False)

# First, we need to find out which partitions we want to use.
# Therefore we count the total number of entries
partition_borders = df.map_partitions(lambda x: len(x)).compute()
partition_borders = partition_borders.cumsum().to_dict()
partition_borders = df.map_partitions(lambda x: len(x))

# Now we let each of the partitions figure out, how much it needs to return
# using these partition borders
# For this, we generate out own dask computation graph (as it does not really)
# fit well with one of the already present methods
# For this, we generate out own dask computation graph (as it does not really
# fit well with one of the already present methods).

# (a) we define a method to be calculated on each partition
# This method returns the part of the partition, which falls between [offset, fetch]
def select_from_to(df, partition_index):
# Please note that the dask object "partition_borders", will be turned into
# its pandas representation at this point and we can calculate the cumsum
# (which is not possible on the dask object). Recalculating it should not cost
# us much, as we assume the number of partitions is rather small.
@dask.delayed
def select_from_to(df, partition_index, partition_borders):
partition_borders = partition_borders.cumsum().to_dict()
this_partition_border_left = (
partition_borders[partition_index - 1] if partition_index > 0 else 0
)
Expand All @@ -141,20 +148,17 @@ def select_from_to(df, partition_index):

return df.iloc[from_index:to_index]

# Then we (b) define a task graph. It should calculate the function above on each of the partitions of
# df (specified by (df._name, i) for each partition i). As an argument, we pass the partition_index.
dask_graph_name = df._name + "-limit"
dask_graph_dict = {}

for partition_index in range(df.npartitions):
dask_graph_dict[(dask_graph_name, partition_index)] = (
select_from_to,
(df._name, partition_index),
partition_index,
)

# We replace df with our new graph
graph = HighLevelGraph.from_collections(
dask_graph_name, dask_graph_dict, dependencies=[df]
# (b) Now we just need to apply the function on every partition
# We do this via the delayed interface, which seems the easiest one.
return dd.from_delayed(
[
select_from_to(partition, partition_number, partition_borders)
for partition_number, partition in enumerate(df.to_delayed())
]
)
return new_dd_object(graph, dask_graph_name, df._meta, df.divisions)

@staticmethod
@dask.delayed
def reverse_partition(partition):
"""Small helper function to revert a partition in dask"""
return partition[::-1]
8 changes: 4 additions & 4 deletions docs/pages/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,15 @@ Limitatons
Whenever you find a not already implemented operation, keyword
or functionality, please raise an issue at our `issue tracker <https://github.com/nils-braun/dask-sql/issues>`_ with your use-case.

Apart from those functional limitations, there are also two operations which need special care: ``ORDER BY`` and ``LIMIT``.
Apart from those functional limitations, there is a operation which need special care: ``ORDER BY```.
Normally, ``dask-sql`` calls create a ``dask`` data frame, which gets only computed when you call the ``.compute()`` member.
Due to internal constraints, this is currently not the case for ``ORDER BY`` and ``LIMIT``.
Including one of those operations will trigger a calculation of the full data frame already when calling ``Context.sql()``.
Due to internal constraints, this is currently not the case for ``ORDER BY``.
Including this operation will trigger a calculation of the full data frame already when calling ``Context.sql()``.

.. warning::

There is a subtle but important difference between adding ``LIMIT 10`` to your SQL query and calling ``sql(...).head(10)``.
The data inside ``dask`` is partitioned, to distribute it over the cluster.
``head`` will only return the first N elements from the first partition - even if N is larger than the partition size.
As a benefit, calling ``.head(N)`` is typically faster than calculating the full data sample with ``.compute()``.
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to compute the full data set for this.
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to do precalculate the first partition to find out, if it needs to have a look into all data or not.
101 changes: 72 additions & 29 deletions tests/integration/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,85 @@
import dask.dataframe as dd


def test_sort(c, user_table_1):
df = c.sql(
def test_sort(c, user_table_1, df):
df_result = c.sql(
"""
SELECT
*
FROM user_table_1
ORDER BY b, user_id DESC
"""
)
df = df.compute().reset_index(drop=True)
df_result = df_result.compute().reset_index(drop=True)
df_expected = user_table_1.sort_values(
["b", "user_id"], ascending=[True, False]
).reset_index(drop=True)

assert_frame_equal(df, df_expected)
assert_frame_equal(df_result, df_expected)

df_result = c.sql(
"""
SELECT
*
FROM df
ORDER BY b DESC, a DESC
"""
)
df_result = df_result.compute()
df_expected = df.sort_values(["b", "a"], ascending=[False, False])

assert_frame_equal(
df_result.reset_index(drop=True), df_expected.reset_index(drop=True)
)

df_result = c.sql(
"""
SELECT
*
FROM df
ORDER BY a DESC, b
"""
)
df_result = df_result.compute()
df_expected = df.sort_values(["a", "b"], ascending=[False, True])

assert_frame_equal(
df_result.reset_index(drop=True), df_expected.reset_index(drop=True)
)

df_result = c.sql(
"""
SELECT
*
FROM df
ORDER BY b, a
"""
)
df_result = df_result.compute()
df_expected = df.sort_values(["b", "a"], ascending=[True, True])

assert_frame_equal(
df_result.reset_index(drop=True), df_expected.reset_index(drop=True)
)


def test_sort_by_alias(c, user_table_1):
df = c.sql(
df_result = c.sql(
"""
SELECT
b AS my_column
FROM user_table_1
ORDER BY my_column, user_id DESC
"""
)
df = df.compute().reset_index(drop=True).rename(columns={"my_column": "b"})
df_result = (
df_result.compute().reset_index(drop=True).rename(columns={"my_column": "b"})
)
df_expected = user_table_1.sort_values(
["b", "user_id"], ascending=[True, False]
).reset_index(drop=True)[["b"]]

assert_frame_equal(df, df_expected)
assert_frame_equal(df_result, df_expected)


def test_sort_with_nan(c):
Expand Down Expand Up @@ -67,52 +114,48 @@ def test_sort_strings(c):
string_table = pd.DataFrame({"a": ["zzhsd", "öfjdf", "baba"]})
c.create_table("string_table", string_table)

df = c.sql(
df_result = c.sql(
"""
SELECT
*
FROM string_table
ORDER BY a
"""
)
df = df.compute().reset_index(drop=True)
df_result = df_result.compute().reset_index(drop=True)
df_expected = string_table.sort_values(["a"], ascending=True).reset_index(drop=True)

assert_frame_equal(df, df_expected)
assert_frame_equal(df_result, df_expected)


def test_sort_not_allowed(c):
# No DESC implemented for the first column
with pytest.raises(NotImplementedError):
c.sql("SELECT * FROM user_table_1 ORDER BY b DESC")

# Wrong column
with pytest.raises(Exception):
c.sql("SELECT * FROM user_table_1 ORDER BY 42")


def test_limit(c, long_table):
df = c.sql("SELECT * FROM long_table LIMIT 101")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 101")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[:101])
assert_frame_equal(df_result, long_table.iloc[:101])

df = c.sql("SELECT * FROM long_table LIMIT 100")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 100")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[:100])
assert_frame_equal(df_result, long_table.iloc[:100])

df = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 99")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 99")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[99 : 99 + 100])
assert_frame_equal(df_result, long_table.iloc[99 : 99 + 100])

df = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 100")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 100")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[100 : 100 + 100])
assert_frame_equal(df_result, long_table.iloc[100 : 100 + 100])

df = c.sql("SELECT * FROM long_table LIMIT 101 OFFSET 101")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 101 OFFSET 101")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[101 : 101 + 101])
assert_frame_equal(df_result, long_table.iloc[101 : 101 + 101])