# Dask

### Options & Imports

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
pd.set_option('display.float_format', lambda x: '%.3f' % x)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [None]:
# Import Libraries & Parameters
import os
import pandas as pd
import numpy as np
from config import gcp_token, bq_db, project_id

In [None]:
# Set GCP credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_token

Dask is an HPC (High Performance Computing) library for Python, based on Pandas. It was created by Continuum, the same people responsible for the Anaconda distribution of Python that we call know and love!

It can either work on your personal machine or a VM or a cluster of VMs.

It's quite easy to get started with and you don't need to learn a new API or language to use it.

There are two ways in which it operates:
* Distributed Data
* Parallel Processing

#### Distributed Data
Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas but can operate in parallel on datasets that don't fit into main memory. Dask's high-level collections are alternatives to NumPy and Pandas for large datasets. This is very similar to how Spark Dataframes operate.

#### Parallel Processing
Dask provides dynamic task schedulers that execute task graphs in parallel. These execution engines power the high-level collections mentioned above but can also power custom, user-defined workloads. These schedulers are low-latency (around 1ms) and work hard to run computations in a small memory footprint. Dask's schedulers are an alternative to direct use of threading or multiprocessing libraries in complex cases or other task scheduling systems like Luigi or IPython parallel.

The above was shamelessly stolen from the [Dask Tutorial](https://github.com/dask/dask-tutorial).

This tutorial focuses on **Parallel Processing**.

We'll start by initialising the Dask `Client()`:

In [None]:
from dask.distributed import Client
import multiprocessing

print(f'Processors: {multiprocessing.cpu_count()}')

n_workers = 4
threads_per_worker = 3

client = Client(
    n_workers=n_workers,
    threads_per_worker=threads_per_worker,
)
client

We can use the built-in `multiprocessing` library to work out how many processors our machine has available and fill this value automatically. Generally 3 - 4 threads per worker (processor) is a good balance. 

With 3 threads, this gives us 24 'cores'. This means instead of doing 1 thing at a time, our machine can now do 24 things at a time. Cool!

Calling `client` brings up some information including a link to the dashboard. his shows us the status of the processing that Dask is doing.

### Intro to Delayed

Again, this is shamelessly stolen from the [Dask Tutorial](https://github.com/dask/dask-tutorial).

We can start by looking at how Python operates normally with a couple of functions:

In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

We'll run these as follows:

In [None]:
%%time
# This takes three seconds to run because we call each function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)

z

Python has caluclated `x` then `y` then `z` giving us a 3 second runtime.

We can parallelise this using Dask's **delayed** Higher Order Function. A Higher Order Function is a function that takes a function as an argument, or returns a function. You can read a good explanation of this concept [here](https://www.tutorialspoint.com/functional_programming/functional_programming_higher_order_functions.htm).

We can delay our functions as follows:


In [None]:
from dask import delayed, compute

In [None]:
%%time

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

z

You'll notice that instead of an output, we've instead got: `Delayed('add-df0666a2-e0b0-474d-a3aa-9377ac421f7a')`

This is because when we delay a function, we don't actually execute it. All Dask has done at this point is make a plan for how it's going to execute our functions.

We can check out this plan (a graph) by calling the `visualize()` method on our output `z` object:

In [None]:
z.visualize()

To execute the functions we have to call the `compute()` function on our `z` object:

In [None]:
%%time

z = compute(z)

We can see that has run in 2 seconds, a second quicker because Dask has executed `x` & `y` at the same time in parallel using different workers.

### Delayed in Practice: Parallelising groupby

One very obvious use case for Dask is to improve the speed at which `groupby` runs, as it has a warranted reputation for running very slowly and inefficiently.

We'll import some BigQuery data: 

TODO: Change this to a csv

In [None]:
# Our SQL query
query = f'SELECT * FROM {bq_db}.merged'

# Import the data from BQ
df = pd.read_gbq(query=query, project_id=project_id)

# Transform the date column
df['date'] = df['date'].dt.tz_localize(None)

# Show the df
df.head()

We'll take create a sample DataFrame of the top 500 records in the BigQuery Data as follows:

In [None]:
df = df.sort_values('target', ascending=False)

uid_list = df['item_price'].unique().tolist()[:300]

df_sm = df[
    df['item_price'].isin(uid_list)
]

In [None]:
len(uid_list)

We're going to build a time series for each of our 100 top records using Facebook's prophet library.

We'll start by making a delayed function to build an individual time series from a DataFrame:

In [None]:
from fbprophet import Prophet

def create_single_time_series(df):
    """
    Creates a Time Series using Facebook Prophet. 
    
    Applied via groupby.
    """
    
    history = df[df['y'].notnull()].copy()
    
    # Only execute if there's more than 1 record
    if history.shape[0] >= 2:
        # Get the UID to append to the predictions
        uid_value = df_ts['item_id'].unique().tolist()[0]

        model = Prophet(
            yearly_seasonality=True,
            weekly_seasonality=False,
            daily_seasonality=False,
            n_changepoints=1
        )
        
        # Add monthly seasonality
        model.add_seasonality(
            name='monthly',
            period=30.5,
            fourier_order=5
        )
    
        model.fit(df, iter=1000)

        # Make a future dataframe to put predictions into
        df_future = model.make_future_dataframe(periods=1)

        # Generate predictions
        df_preds = model.predict(df_future)

        # Add UID
        df_preds['item_id'] = uid_value

        # Return as a dict since it's quicker to combine dicts than dataframe
        dict_preds = df_preds.to_dict(orient='records')
        
    else:
        dict_preds = []
    
    return dict_preds

Firstly Prophet is very good for programatically generating lots of time series at once. There are both Python and R implementations and you can check out an overview [here](https://facebook.github.io/prophet/docs/quick_start.html).

Secondly note the `@delayed` decorator. This gives exactly the same outcome as calling `delayed()` on a function and is Python's way of specifying a Higher Order Function.

Lastly, note that we transform our dataframe to a dictionary. This is because it's quicker and easier to make a new dataframe from 100 dictionaries than it is to concatanate 100 dataframes.

## 300 Time Series in Dask

In [None]:
%%time

# Only keep necessary columns and rename time series and target to 'ds' and 'y'
df_ts = (
    df_sm[['item_id', 'date', 'target']]
    .rename({
        'date': 'ds',
        'target': 'y'
    }, axis=1)
)

# Fill any NaN values
df_ts['y'] = df_ts['y'].fillna(0)

# Create an empty list to hold our results
ts_output = []

# Group the DataFrame by unique id
df_ts_gp = df_ts.groupby('item_id')

for group in df_ts_gp.groups:
    # Get the group (note that returned df is ungrouped)
    df_ts = df_ts_gp.get_group(group)
    ts_dict = delayed(create_single_time_series)(df_ts)
    ts_output.append(ts_dict)
    
# Execute the function
ts_output = compute(ts_output)[0]
    
df_list = []

for item in ts_output:
    for record in item:
        df_list.append(record)
        
df_dask = pd.DataFrame(df_list).sort_values(['item_id', 'ds'])

## 300 Time Series in Pandas

In [None]:
%%time

# Only keep necessary columns and rename time series and target to 'ds' and 'y'
df_ts = (
    df_sm[['item_id', 'date', 'target']]
    .rename({
        'date': 'ds',
        'target': 'y'
    }, axis=1)
)

# Fill any NaN values
df_ts['y'] = df_ts['y'].fillna(0)

# Create an empty list to hold our results
ts_output = []

# Group the DataFrame by unique id
df_ts_gp = df_ts.groupby('item_id')

for group in df_ts_gp.groups:
    # Get the group (note that returned df is ungrouped)
    df_ts = df_ts_gp.get_group(group)
    ts_dict = create_single_time_series(df_ts)
    ts_output.append(ts_dict)
    
# Execute the function
ts_output = compute(ts_output)[0]
    
df_list = []

for item in ts_output:
    for record in item:
        df_list.append(record)
        
df_dask = pd.DataFrame(df_list).sort_values(['item_id', 'ds'])

### Further Reading

* [Dask Site](https://dask.org/)
* [Dask Tutorial](https://github.com/dask/dask-tutorial)
* [Dask API Reference](https://docs.dask.org/en/latest/)
* [Higher Order Functions](https://www.tutorialspoint.com/functional_programming/functional_programming_higher_order_functions.htm)