# dask-sql
### A SQL Query Layer for Dask

## Introduction

`dask-sql` adds a SQL query layer on top of the Dask distributed Python library, which allows you to query your big and small data with SQL and still use the great power of the Dask ecosystem.
It helps you combine the best of both worlds.
See the [documentation](https://dask-sql.readthedocs.io/) for more information.

## Starting dask-sql

There are two possibilities how you can send your SQL queries to `dask-sql`:
* you use a Python notebook/script, such as the one you have currently opened
* you run the [dask-sql Server](https://dask-sql.readthedocs.io/en/latest/pages/server.html) as a standalone application and connect to it via e.g. your BI tool

We will stick with the first possibility in this notebook, but all SQL commands shown here can also be run via the SQL server.

Before we start, we need do import `dask-sql` and create a `Context`, which collects all the information on the currently registered data tables.
We will also create a small local Dask cluster (this step is not needed, but gives us a bit more debugging options).
If you have a large computation cluster, you can connect to it in this step (have a look [here](https://docs.dask.org/en/latest/setup.html)).

In [None]:
from dask_sql import Context
from dask.distributed import Client

client = Client()
c = Context()

In [None]:
client

You are now ready to query with SQL!

In [None]:
c.sql("""
    SELECT 42 AS "the answer"
""", return_futures=False)

Some shortcut for the following:

In [None]:
c.ipython_magic(auto_include=True)

This line allows us to write (instead of the line above)

In [None]:
%%sql
SELECT 42 AS "the answer"

## Data Input

### 1. From a Dask Dataframe via Python

In [None]:
import dask.dataframe as dd

df = dd.read_csv("./iris.csv")

In [None]:
df.head(10)

In [None]:
c.create_table("iris", df)

### 2. From an external data source via SQL

In [None]:
%%sql
CREATE OR REPLACE TABLE iris
WITH (
    location = 'file://./iris.csv',
    format = 'csv',
    persist = True
)

* s3, azure, dbfs (new!), gs, hdfs, ...
* hive (experimental), databricks (experimental), intake
* already loaded data persisted in your Dask cluster

More [information](https://dask-sql.readthedocs.io/en/latest/pages/data_input.html)

### 3. As materialized Queries

In [None]:
%%sql
CREATE OR REPLACE TABLE second_iris
AS SELECT * FROM iris

### 4. From the notebook

As we have created an ipython magic with `c.ipython_magic(auto_include=True)` we can even just reference any dataframe created in the notebook in our queries.

In [None]:
my_data_frame = dd.read_csv("./iris.csv")

In [None]:
%%sql
SELECT * FROM my_data_frame LIMIT 10

Please note that using this setting will automatically override any predefined tables with the same name.

## Metadata Information

In [None]:
%%sql
SHOW TABLES FROM "schema"

In [None]:
%%sql
SHOW COLUMNS FROM "iris"

In [None]:
%%sql
DESCRIBE iris

In [None]:
%%sql
DESCRIBE TABLE iris

## Data Query

You can call "normal" SQL `SELECT` statements in `dask-sql`, with all typical components from the standard SQL language.
More information in the [SQL reference](https://dask-sql.readthedocs.io/en/latest/pages/sql.html).
`dask-sql` roughly follows the prestoSQL conventions (e.g. quoting).

<div class="alert alert-info">
    
#### Note
    
Not all SQL operators are implemented in `dask-sql` already.
    
</div>

In [None]:
%%sql
SELECT * 
FROM iris
LIMIT 10

In [None]:
%%sql
SELECT 
    sepal_length + sepal_width AS "sum", 
    SIN(petal_length) AS "sin"
FROM iris
LIMIT 10

In [None]:
%%sql
SELECT 
    species,
    AVG(sepal_length) AS sepal_length, 
    AVG(sepal_width) AS sepal_width
FROM iris
GROUP BY species
LIMIT 10

In [None]:
%%sql
WITH maximal_values AS (
    SELECT 
        species, 
        MAX(sepal_length) AS sepal_length
    FROM iris
    GROUP BY species
)
SELECT lhs.*
FROM iris AS lhs 
JOIN maximal_values AS rhs ON lhs.species = rhs.species AND lhs.sepal_length = rhs.sepal_length

In [None]:
print(c.explain("""
    WITH maximal_values AS (
        SELECT 
            species, 
            MAX(sepal_length) AS sepal_length
        FROM iris
        GROUP BY species
    )
    SELECT 
        lhs.*
    FROM iris AS lhs 
    JOIN maximal_values AS rhs
    ON lhs.species = rhs.species 
        AND lhs.sepal_length = rhs.sepal_length
"""))

## Custom Functions

In [None]:
import numpy as np

def volume(length, width):
    return (width / 2) ** 2 * np.pi * length

# As SQL is a typed language, we need to specify all types 
c.register_function(volume, "IRIS_VOLUME", 
                    parameters=[("length", np.float64), ("width", np.float64)], 
                    return_type=np.float64)

In [None]:
%%sql
SELECT 
    sepal_length, sepal_width, IRIS_VOLUME(sepal_length, sepal_width) AS volume
FROM iris
LIMIT 10

## Machine Learning

In [None]:
df.species.head(100)

In [None]:
%%sql
CREATE OR REPLACE TABLE enriched_iris AS (
    SELECT 
        sepal_length, sepal_width, petal_length, petal_width,
        CASE 
            WHEN species = 'setosa' THEN 0 ELSE CASE 
            WHEN species = 'versicolor' THEN 1
            ELSE 2 
        END END AS "species", 
        IRIS_VOLUME(sepal_length, sepal_width) AS volume
    FROM iris 
)

In [None]:
%%sql
CREATE OR REPLACE TABLE training_data AS (
    SELECT 
        *
    FROM enriched_iris
    TABLESAMPLE BERNOULLI (50)
)

In [None]:
%%sql
SELECT * FROM training_data

In [None]:
%%sql
CREATE OR REPLACE MODEL my_model WITH (
    model_class = 'dask_ml.xgboost.XGBClassifier',
    target_column = 'species',
    num_class = 3
) AS (
    SELECT * FROM training_data
)

In [None]:
%%sql
SHOW MODELS

In [None]:
%%sql
DESCRIBE MODEL my_model

In [None]:
%%sql
SELECT
    *
FROM PREDICT(
    MODEL my_model,
    SELECT * FROM enriched_iris
)

In [None]:
%%sql
CREATE OR REPLACE TABLE results AS
SELECT
    *
FROM PREDICT(
    MODEL my_model,
    TABLE enriched_iris
)

In [None]:
%%sql
SELECT
    target,
    species,
    COUNT(*)
FROM
    results
GROUP BY target, species

In [None]:
t = c.sql("""
    SELECT
        target,
        species,
        COUNT(*) AS "number"
    FROM
        results
    GROUP BY target, species
""").compute() 
t.set_index(["target", "species"]).unstack("species").number.plot.bar()