# Dask dataframes on a cluster

The aim of this afternoon's session is to show you how you can go smoothly from 'a pandas dataframe I can handle on my computer' to 'a huuuge dataframe that I can handle on a cluster of X computers using dask'

[Dask](http://dask.pydata.org) is a library which provides advanced parallelism for analytics using familiar Python APIs like [pandas](pandas.pydata.org), [numpy](numpy.org) and [scikit-learn](scikit-learn.org)

We'll take a look at how we can scale the groupby/apply approaches we learnt this morning to a bigger dataframe on a cluster

Note that you actually need to have a cluster running for this to work. I've got some basic instructions for spinning up a cluster in Google Cloud in `../handouts/running_dask_gloud.md` but there's a lot of concepts to follow to get this running. If you want to try this locally on your own computer you can just install dask-distributed using conda (either in Anaconda Navigator or on the command line with `conda install dask distributed`).

In [2]:
import s3fs

s3 = s3fs.S3FileSystem()
s3.ls('dask-data/nyc-taxi/2015')

['dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-02.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-03.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-04.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-05.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-06.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-07.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-08.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-09.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-10.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-11.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-12.csv',
 'dask-data/nyc-taxi/2015/parquet.gz',
 'dask-data/nyc-taxi/2015/parquet',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.parq']

This data is too large to fit into Pandas on a single computer. However, it can fit in memory if we break it up into many small pieces and load these pieces onto different computers across a cluster.

We connect a client to our Dask cluster, composed of one centralized dask-scheduler process and several dask-worker processes running on each of the machines in our cluster.

In [3]:
%%!
# Print the scheduler and 
export NAME="analytics"
export DASK_SCHEDULER=$(kubectl get svc --namespace default ${NAME}-dask-scheduler -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export DASK_SCHEDULER_UI_IP=$(kubectl get svc --namespace default ${NAME}-dask-scheduler -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
export JUPYTER_NOTEBOOK_IP=$(kubectl get svc --namespace default ${NAME}-dask-jupyter -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo http://$JUPYTER_NOTEBOOK_IP:80 -- Jupyter notebook
echo http://$DASK_SCHEDULER_UI_IP:80  -- Dask dashboard
echo http://$DASK_SCHEDULER:8786    -- Dask Client connection

['http://35.201.26.78:80 -- Jupyter notebook',
 'http://35.189.30.21:80 -- Dask dashboard',
 'http://35.189.30.21:8786 -- Dask Client connection']

In [4]:
from dask.distributed import Client

scheduler_ip = "35.189.30.21"
scheduler_port = "8786"

client = Client(f'{scheduler_ip}:{scheduler_port}')

We can use dask to parse the CSVs into a dataframe which looks and feels like a dataframe on our machine but is really being stored on the cluster

In [7]:
import dask.dataframe as dd

df = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv',
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                 storage_options={'anon': True})
df = client.persist(df)

In [10]:
df.head()

CancelledError: ('from-delayed-46ea7b05ff8a6f822e37775846598836', 0)

distributed.batched - INFO - Batched Comm Closed: in <closed TCP>: TimeoutError: [Errno 60] Operation timed out
