Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase speed and parallelism of the limit algorithm and implement descending sorting #75

Merged
merged 7 commits into from
Nov 15, 2020
66 changes: 31 additions & 35 deletions dask_sql/physical/rel/logical/sort.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Dict, List
from typing import List

import dask
import dask.dataframe as dd
from dask.highlevelgraph import HighLevelGraph
from dask.dataframe.core import new_dd_object
import pandas as pd
import dask.array as da

Expand Down Expand Up @@ -62,11 +61,6 @@ def _apply_sort(
first_sort_column = sort_columns[0]
first_sort_ascending = sort_ascending[0]

# Sort the first column with set_index. Currently, we can only handle ascending sort
if not first_sort_ascending:
raise NotImplementedError(
"The first column needs to be sorted ascending (yet)"
)
# We can only sort if there are no NaNs or infs.
# Therefore we need to do a single pass over the dataframe
# to warn the user
Expand All @@ -79,6 +73,11 @@ def _apply_sort(
raise ValueError("Can not sort a column with NaNs")

df = df.set_index(first_sort_column, drop=False).reset_index(drop=True)
if not first_sort_ascending:
# As set_index().reset_index() always sorts ascending, we need to reverse
# the order inside all partitions and the order of the partitions itself
df = df.map_partitions(lambda partition: partition[::-1], meta=df)
df = df.partitions[::-1]

# sort the remaining columns if given
if len(sort_columns) > 1:
Expand All @@ -94,8 +93,7 @@ def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame
Limit the dataframe to the window [offset, end].
That is unfortunately, not so simple as we do not know how many
items we have in each partition. We have therefore no other way than to
calculate (!!!) the sizes of each partition
(this means we need to compute the dataframe already here).
calculate (!!!) the sizes of each partition.

After that, we can create a new dataframe from the old
dataframe by calculating for each partition if and how much
Expand All @@ -104,24 +102,31 @@ def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame
we need to pass the partition number to the selection
function, which is not possible with normal "map_partitions".
"""
# As we need to calculate the partition size, we better persist
# the df. I think...
# TODO: check if this is the best thing to do
df = df.persist()
if not offset:
# We do a (hopefully) very quick check: if the first partition
# is already enough, we will just ust this
first_partition_length = len(df.partitions[0])
if first_partition_length >= end:
return df.head(end, compute=False)

# First, we need to find out which partitions we want to use.
# Therefore we count the total number of entries
partition_borders = df.map_partitions(lambda x: len(x)).compute()
partition_borders = partition_borders.cumsum().to_dict()
partition_borders = df.map_partitions(lambda x: len(x))

# Now we let each of the partitions figure out, how much it needs to return
# using these partition borders
# For this, we generate out own dask computation graph (as it does not really)
# fit well with one of the already present methods
# For this, we generate out own dask computation graph (as it does not really
# fit well with one of the already present methods).

# (a) we define a method to be calculated on each partition
# This method returns the part of the partition, which falls between [offset, fetch]
def select_from_to(df, partition_index):
# Please note that the dask object "partition_borders", will be turned into
# its pandas representation at this point and we can calculate the cumsum
# (which is not possible on the dask object). Recalculating it should not cost
# us much, as we assume the number of partitions is rather small.
@dask.delayed
def select_from_to(df, partition_index, partition_borders):
partition_borders = partition_borders.cumsum().to_dict()
this_partition_border_left = (
partition_borders[partition_index - 1] if partition_index > 0 else 0
)
Expand All @@ -141,20 +146,11 @@ def select_from_to(df, partition_index):

return df.iloc[from_index:to_index]

# Then we (b) define a task graph. It should calculate the function above on each of the partitions of
# df (specified by (df._name, i) for each partition i). As an argument, we pass the partition_index.
dask_graph_name = df._name + "-limit"
dask_graph_dict = {}

for partition_index in range(df.npartitions):
dask_graph_dict[(dask_graph_name, partition_index)] = (
select_from_to,
(df._name, partition_index),
partition_index,
)

# We replace df with our new graph
graph = HighLevelGraph.from_collections(
dask_graph_name, dask_graph_dict, dependencies=[df]
# (b) Now we just need to apply the function on every partition
# We do this via the delayed interface, which seems the easiest one.
return dd.from_delayed(
[
select_from_to(partition, partition_number, partition_borders)
for partition_number, partition in enumerate(df.partitions)
]
)
return new_dd_object(graph, dask_graph_name, df._meta, df.divisions)
8 changes: 4 additions & 4 deletions docs/pages/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,15 @@ Limitatons
Whenever you find a not already implemented operation, keyword
or functionality, please raise an issue at our `issue tracker <https://github.com/nils-braun/dask-sql/issues>`_ with your use-case.

Apart from those functional limitations, there are also two operations which need special care: ``ORDER BY`` and ``LIMIT``.
Apart from those functional limitations, there is a operation which need special care: ``ORDER BY```.
Normally, ``dask-sql`` calls create a ``dask`` data frame, which gets only computed when you call the ``.compute()`` member.
Due to internal constraints, this is currently not the case for ``ORDER BY`` and ``LIMIT``.
Including one of those operations will trigger a calculation of the full data frame already when calling ``Context.sql()``.
Due to internal constraints, this is currently not the case for ``ORDER BY``.
Including this operation will trigger a calculation of the full data frame already when calling ``Context.sql()``.

.. warning::

There is a subtle but important difference between adding ``LIMIT 10`` to your SQL query and calling ``sql(...).head(10)``.
The data inside ``dask`` is partitioned, to distribute it over the cluster.
``head`` will only return the first N elements from the first partition - even if N is larger than the partition size.
As a benefit, calling ``.head(N)`` is typically faster than calculating the full data sample with ``.compute()``.
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to compute the full data set for this.
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to precalculate the first partition to find out, if it needs to have a look into all data or not.
101 changes: 72 additions & 29 deletions tests/integration/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,85 @@
import dask.dataframe as dd


def test_sort(c, user_table_1):
df = c.sql(
def test_sort(c, user_table_1, df):
df_result = c.sql(
"""
SELECT
*
FROM user_table_1
ORDER BY b, user_id DESC
"""
)
df = df.compute().reset_index(drop=True)
df_result = df_result.compute().reset_index(drop=True)
df_expected = user_table_1.sort_values(
["b", "user_id"], ascending=[True, False]
).reset_index(drop=True)

assert_frame_equal(df, df_expected)
assert_frame_equal(df_result, df_expected)

df_result = c.sql(
"""
SELECT
*
FROM df
ORDER BY b DESC, a DESC
"""
)
df_result = df_result.compute()
df_expected = df.sort_values(["b", "a"], ascending=[False, False])

assert_frame_equal(
df_result.reset_index(drop=True), df_expected.reset_index(drop=True)
)

df_result = c.sql(
"""
SELECT
*
FROM df
ORDER BY a DESC, b
"""
)
df_result = df_result.compute()
df_expected = df.sort_values(["a", "b"], ascending=[False, True])

assert_frame_equal(
df_result.reset_index(drop=True), df_expected.reset_index(drop=True)
)

df_result = c.sql(
"""
SELECT
*
FROM df
ORDER BY b, a
"""
)
df_result = df_result.compute()
df_expected = df.sort_values(["b", "a"], ascending=[True, True])

assert_frame_equal(
df_result.reset_index(drop=True), df_expected.reset_index(drop=True)
)


def test_sort_by_alias(c, user_table_1):
df = c.sql(
df_result = c.sql(
"""
SELECT
b AS my_column
FROM user_table_1
ORDER BY my_column, user_id DESC
"""
)
df = df.compute().reset_index(drop=True).rename(columns={"my_column": "b"})
df_result = (
df_result.compute().reset_index(drop=True).rename(columns={"my_column": "b"})
)
df_expected = user_table_1.sort_values(
["b", "user_id"], ascending=[True, False]
).reset_index(drop=True)[["b"]]

assert_frame_equal(df, df_expected)
assert_frame_equal(df_result, df_expected)


def test_sort_with_nan(c):
Expand Down Expand Up @@ -67,52 +114,48 @@ def test_sort_strings(c):
string_table = pd.DataFrame({"a": ["zzhsd", "öfjdf", "baba"]})
c.create_table("string_table", string_table)

df = c.sql(
df_result = c.sql(
"""
SELECT
*
FROM string_table
ORDER BY a
"""
)
df = df.compute().reset_index(drop=True)
df_result = df_result.compute().reset_index(drop=True)
df_expected = string_table.sort_values(["a"], ascending=True).reset_index(drop=True)

assert_frame_equal(df, df_expected)
assert_frame_equal(df_result, df_expected)


def test_sort_not_allowed(c):
# No DESC implemented for the first column
with pytest.raises(NotImplementedError):
c.sql("SELECT * FROM user_table_1 ORDER BY b DESC")

# Wrong column
with pytest.raises(Exception):
c.sql("SELECT * FROM user_table_1 ORDER BY 42")


def test_limit(c, long_table):
df = c.sql("SELECT * FROM long_table LIMIT 101")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 101")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[:101])
assert_frame_equal(df_result, long_table.iloc[:101])

df = c.sql("SELECT * FROM long_table LIMIT 100")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 100")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[:100])
assert_frame_equal(df_result, long_table.iloc[:100])

df = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 99")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 99")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[99 : 99 + 100])
assert_frame_equal(df_result, long_table.iloc[99 : 99 + 100])

df = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 100")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 100")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[100 : 100 + 100])
assert_frame_equal(df_result, long_table.iloc[100 : 100 + 100])

df = c.sql("SELECT * FROM long_table LIMIT 101 OFFSET 101")
df = df.compute()
df_result = c.sql("SELECT * FROM long_table LIMIT 101 OFFSET 101")
df_result = df_result.compute()

assert_frame_equal(df, long_table.iloc[101 : 101 + 101])
assert_frame_equal(df_result, long_table.iloc[101 : 101 + 101])