Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input from cluster memory #68

Merged
merged 9 commits into from
Nov 3, 2020
15 changes: 10 additions & 5 deletions dask_sql/physical/rel/custom/create.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from distributed.client import default_client
import pandas as pd
import dask.dataframe as dd

Expand Down Expand Up @@ -68,12 +69,16 @@ def convert(
except KeyError:
raise AttributeError("Parameters must include a 'location' parameter.")

read_function_name = f"read_{format}"
if format == "memory":
client = default_client()
read_function = client.get_dataset
else:
read_function_name = f"read_{format}"

try:
read_function = getattr(dd, read_function_name)
except AttributeError:
raise AttributeError(f"Do not understand input format {format}.")
try:
read_function = getattr(dd, read_function_name)
except AttributeError:
raise AttributeError(f"Do not understand input format {format}.")

df = read_function(location, **kwargs)

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ but any other data (from disk, S3, API, hdfs) can be used.
pages/installation
pages/quickstart
pages/sql
pages/data_input
pages/custom
pages/api
pages/server
Expand Down
86 changes: 86 additions & 0 deletions docs/pages/data_input.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
.. _data_input:

Data Loading and Input
======================

Before data can be queried with ``dask-sql``, it needs to be loaded into the dask cluster (or local instance) and registered with the :class:`dask_sql.Context`.
For this, ``dask-sql`` uses the wide field of possible `input formats <https://docs.dask.org/en/latest/dataframe-create.html>`_ of ``dask``.
You have multiple possibilities to load input data in ``dask-sql``:

1. Load it with dask via python
-------------------------------

You can either use already created dask dataframes or create one by using one of the ``read_<format>`` functions from ``dask``.
Chances are high, there exists already a function to load your favorite format or location (e.g. s3 or hdfs.
Make sure to install required libraries both on the driver and worker machines.

.. code-block:: python

import dask.dataframe as dd
from dask_sql import Context

c = Context()
df = dd.read_csv("s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv")

c.register_dask_table(df, "my_data")

2. Load if via SQL
------------------

If you are connected to the SQL server implementation or you do not want to issue python command calls, you can also
achieve the data loading via SQL only.

.. code-block:: sql

CREATE TABLE my_data WITH (
format = 'csv',
location = 's3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv'
)

You can find more information in :ref:`creation`.


3. Persist and share data on the cluster
----------------------------------------

In ``dask``, you can publish datasets with names into the cluster memory.
This allows to reuse the same data from multiple clients/users in multiple sessions.

For example, you can publish your data using the ``client.publish_dataset`` function of the ``distributed.Client``,
and then later register it in the :class:`dask_sql.Context` via SQL:

.. code-block:: python

# a dask.distributed Client
client = Client(...)
client.publish_dataset(my_ds=df)

Later in SQL:

.. code-block:: SQL

CREATE TABLE my_data WITH (
format = 'memory',
location = 'my_ds'
)

Note, that the format is set to ``memory`` and the location is the name, which was chosen when publishing the dataset.

To achieve the same thing from python, you can just use dask's methods to get the dataset

.. code-block:: python

df = client.get_dataset("my_df")
c.register_dask_table(df, "my_data")


.. note::

For ``dask-sql`` it does not matter how you load your data.
In all shown cases you can then use the specified table name to query your data
in a ``SELECT`` call.

Please note however that un-persisted data will be reread from its source (e.g. on S3 or disk)
on every query whereas persisted data is only read once.
This will increase the query speed, but will also prevent you from seeing external updates to your
data (until you reload it explicitly).
2 changes: 2 additions & 0 deletions docs/pages/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ In this example, we do not read in external data, but use test data in the form

df = dask.datasets.timeseries()

Read more on the data input part in :ref:`data_input`.

2. Data Registration
--------------------

Expand Down
4 changes: 4 additions & 0 deletions docs/pages/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ To show column information on a specific table named "df"

SHOW COLUMNS FROM "df"

.. _creation:

Table Creation
--------------

Expand All @@ -125,6 +127,8 @@ and format.
With the ``persist`` parameter, it can be controlled if the data should be cached
or re-read for every SQL query.
The additional parameters are passed to the call to ``read_<format>``.
If you omit the format argument, it will be deduced from the file name extension.
More ways to load data can be found in :ref:`data_input`.


Implemented operations
Expand Down
25 changes: 24 additions & 1 deletion tests/integration/test_create.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest

import dask.dataframe as dd
from pandas.testing import assert_frame_equal


Expand All @@ -26,6 +26,29 @@ def test_create_from_csv(c, df, temporary_data_file):
assert_frame_equal(df, df)


def test_cluster_memory(client, c, df):
client.publish_dataset(df=dd.from_pandas(df, npartitions=1))

c.sql(
f"""
CREATE TABLE
new_table
WITH (
location = 'df',
format = 'memory'
)
"""
)

return_df = c.sql(
"""
SELECT * FROM new_table
"""
).compute()

assert_frame_equal(df, return_df)


def test_create_from_csv_persist(c, df, temporary_data_file):
df.to_csv(temporary_data_file, index=False)

Expand Down