# Dask-SQL: Empowering Pythonistas for <br>Scalable End-to-End Data Engineering and Data Science

<img src='images/med-head.jpg' width=300>

## Who Am I?

### Adam Breindel

__LinkedIn__ - https://www.linkedin.com/in/adbreind<br>
__Email__ - <tt>adbreind@gmail.com</tt><br>
__Twitter__ - <tt>@adbreind</tt>

__What Do I Do?__
* Training Lead at Coiled Computing: https://coiled.io
    * Dask scales Python for data science and machine learning
    * Coiled makes it easy to scale on the cloud
* Consulting on data engineering and machine learning
    * Development
    * Various advisory roles
* 20+ years building systems for startups and large enterprises
* 10+ years teaching front- and back-end technology

__Fun large-scale data projects__
* Streaming neural net + decision tree fraud scoring
* Realtime & offline analytics for banking
* Music synchronization and licensing for networked jukeboxes

__Industries__
* Finance / Insurance
* Travel, Media / Entertainment
* Energy, Government
* Advertising/Social Media, & more

<br>
<br>

---

<br>
<br>

# Basic large-scale enterprise data processing pattern

<img src='images/end2end.png' width=800>
<br>
<br>
Yes, we're missing a lot of important upstream work (data aquisition, ingestion) and downstream (deploy, monitor), but today we're focusing on *SQL*

<br>
<br>

---

<br>
<br>

# Let's zoom in on extracting from a data lake/warehouse and transforming

<img src='images/2020.png' width=800>

* There are __other__ tools (Presto/Trino, Spark, etc.) that can help
* But we're *Pythonistas* and maybe not experts (or interested) in integrating complex JVM-based tools
* And we'd like to ...
  * Use Python together with SQL at scale
  * Create services and tools for our company/team that use SQL
  * Because many more folks know SQL than Python! (I know it's hard to believe, but it's true :)

<br>
<br>

---

<br>
<br>

# We're all happy it's 2021

<img src='images/2021.png' width=800>


<br>
<br>

---

<br>
<br>


# Introducing Dask-SQL
## Adding SQL execution and Hive access to Python!

<img src='images/nils.jpeg' width=300>

### Nils Braun
* Data Engineer for Enabling: Bosch Center for Artificial Intelligence (BCAI)
* https://www.linkedin.com/in/nlb/
* https://github.com/nils-braun

### Dask-SQL

Core features

* SQL parsing, optimization, planning, translation for Dask
* Start with data from...
    * files in the cloud (e.g., S3)
    * any data in Python (e.g., Pandas or Dask Dataframe)
    * modern data catalog/aggregation like Intake (https://github.com/intake/intake)
    * __direct from enterprise data lakes/warehouses: Hive Metastore, Databricks, etc.__
        * Bring the SQL integration power of Spark right into the Python/Dask world
* Query cached datasets to leverage the speed of a large distributed memory pool

Bonus features
* user-defined functions
* a SQL server
* ML in SQL
* a command-line client
* more in the works!

Learn more...
* Homepage: https://nils-braun.github.io/dask-sql/
* Docs: https://dask-sql.readthedocs.io/en/latest/
* Source: https://github.com/nils-braun/dask-sql

<br>
<br>

---

<br>
<br>

## Before we dive into code ... a little clarification: data lakes

If you haven't worked a lot in the large-scale data space, it can be a bit confusing why we need a Dask-SQL project. Common questions include...

How is this different from...
* Dask `read_sql_table`? 
* Pandas `read_sql`, `read_sql_table`, or `read_sql_query`?
* SQLAlchemy
* etc.

The fundamental difference is: __those other approaches pass your query to a database system which already understands SQL, can execute a query, and has control over your data__

__In enterprise data lakes, that "database" likely does not exist.__ Instead, you may have huge collections of files, in a variety of formats, with no query engine, and no process which has "control" over your data.

You may not even have a data catalog. In other cases, you may have a catalog, but it is tied to a Hadoop/JVM-based system like Hive or Spark.

In these data lake systems, all of the `read_sql` techniques above may not work at all, or may require you to pass your logic through to Hive/Spark/etc., requiring you to understand, use, and tune those systems before you can even start your work in Python.

The goal of Dask-SQL is to allow you to formulate a SQL query against arbitrary files & formats, and execute that query at large scale with Dask.

<br>
<br>

---

<br>
<br>

## It's coding time!

We'll demo three key approaches here:

1. Creating a Dask Dataframe -- a lazy, distributed datastructure -- over a set of files, and then using Dask-SQL to query the data

2. Creating a Dask-SQL table completely within SQL, and querying that -- an approach that will be very helpful working your SQL analyst friends

3. Using Dask-SQL to access tables *already defined in the Hive catalog ("metastore")* but querying the underlying files with Dask -- an incredibly valuable missing link for Python data folks working within orgs that rely on Hive to catalog their data.

In [None]:
from dask.distributed import Client

client = Client()

client

In [None]:
from dask_sql import Context

c = Context()

In [None]:
import dask.dataframe as dd

df = dd.read_csv('data/powerplant.csv')

df

In [None]:
c.create_table("powerplant", df)

result = c.sql('SELECT * FROM powerplant')

result

In [None]:
type(result)

In [None]:
result.compute()

In [None]:
c.sql('SELECT * FROM powerplant', return_futures=False) # run immediately -- beware of large result sets!

In [None]:
type(c.sql('SELECT * FROM powerplant', return_futures=False))

In [None]:
query = '''
SELECT
    FLOOR("AT") AS temp, AVG("PE") AS output
FROM
    powerplant
GROUP BY 
    FLOOR("AT")
'''

result = c.sql(query)

result

In [None]:
result.compute().plot.scatter('temp','output')

# hint: if you're not totally convinced the computation is happening in Dask, look at the Dask Task Stream dashboard!

Maybe we could build a successful model with this data ... in fact, we could do it with any combination of
* Data prep in SQL, training/prediction in Python
* Training in Python, prediction in SQL
* Everything (!) in SQL
* Sound interesting? Check it out: https://dask-sql.readthedocs.io/en/latest/pages/machine_learning.html

### What about "creating the table completely in SQL"?

First, let's go "full SQL" so we don't even need to wrap our queries in Python...

In [None]:
c.ipython_magic()

In [None]:
%%sql

CREATE TABLE allsql WITH (
    format = 'csv',
    location = 'data/powerplant.csv' -- any Dask-accessible source or format (cloud/S3/..., parquet/ORC/...)
)

In [None]:
%%sql

SELECT
    FLOOR("AT") AS temp, AVG("PE") AS output
FROM
    allsql
GROUP BY 
    FLOOR("AT")
LIMIT 10

### Let's see that Hive catalog integration!

*note: this demo will not run in the standalone binder notebook available after PyCon, as it relies on a Hive server which is not configured in that container*

In [None]:
from pyhive.hive import connect

cursor = connect("localhost", 10000).cursor()

c.create_table("my_diamonds", cursor, hive_table_name="diamonds")

Here's the magic...
* If you look at the Hive Server web UI, you'll see a query just ran to get schema info on the `Diamonds` table
* But in the following queries
    * Data is accessed directly from the underlying files
    * No Hive queries are run
    * All compute is done in Dask/Python

In [None]:
%%sql

SELECT * FROM my_diamonds LIMIT 10

In [None]:
query = '''
SELECT FLOOR(10*carat)/10 AS carat, AVG(price) AS price, COUNT(1) AS num 
FROM my_diamonds
GROUP BY FLOOR(10*carat)
'''

data = c.sql(query).compute()

data.plot.scatter('carat', 'price')
data.plot.bar('carat', 'num')

## A Quick Look at How Dask-SQL Works

* Locate the source data
    * Hive, Intake, Databricks catalog integration
    * Files or Python data provided by user


* Prepare the query using Apache Calcite
    * Parse SQL
    * Analyze (check vs. schema, etc.)
    * Optimize


* Create execution plan
    * Take logical relational operators (`SELECT`/project, `WHERE`/filter, `JOIN`, etc.) 
    * Convert into Dask Dataframe API calls (`query`, `merge`, etc.)


* Then either...
    * Return a handle to the Dask Dataframe of results (recall this is a virtual Dataframe, so no execution yet)
    * or
    * Compute (materialize) the resulting dataframe and return the result as a Pandas Dataframe
    
More detail at https://dask-sql.readthedocs.io/en/latest/pages/how_does_it_work.html

## Some Practical Details

### Installing Dask-SQL

Recommended approach is via conda and conda-forge -- this will include all dependencies like the JVM, and avoid conflicts by keeping everything within a conda environment.

There are also a few other options: more details at https://dask-sql.readthedocs.io/en/latest/pages/installation.html

### Supported SQL Operators

Dask-SQL is a young project, so it does not yet support all of SQL

More detail on
* Query support https://dask-sql.readthedocs.io/en/latest/pages/sql/select.html
* Table creation https://dask-sql.readthedocs.io/en/latest/pages/sql/creation.html
* ML via SQL https://dask-sql.readthedocs.io/en/latest/pages/sql/ml.html

### How to Contribute

Source code and info on installing for development is at  https://github.com/nils-braun/dask-sql

Check issues -- or file a new bug -- at https://github.com/nils-braun/dask-sql/issues

And there's even a "good first issue" list at https://github.com/nils-braun/dask-sql/contribute

# Thank You!