Problem:

- Our occupational choice model is simple and static... but has many parameters to estimate
- $K = $ Occupations $\times$ years $\approx$ 3000, $N = 50$mio, 20GB

Using Python for data analysis:
- Pandas is great for datasets that fit well in Memory (< 1GB?)
- Python itself was not developed for parallelization (Global Interpreter Lock)
- Move to Scala and Java based Big Data frameworks?... (map-reduce approaches: Apache Spark, Hadoop)
    

# Dask: Scalable analytics in Python

Taken from https://towardsdatascience.com/why-every-data-scientist-should-use-dask-81b2b850e15b, 

Dask is the most **revolutionary tool for data processing**:

1. If struggling with data **larger than RAM**
2. Dask **supports the Pandas dataframe** and **Numpy array data** structures
3. Run on your **local computer** or scale to a **cluster**
4. With **minimal code changes**, run code code in **parallel**

Basics:
- Dask is maintained by Anaconda developers since 2015
- It has an amazing dashboard to see what is going on
- Main package:

`conda install dask`
- Make scikit-learn work with large datasets:

`conda install -c conda-forge dask-ml`

Dask is a flexible library for parallel computing in Python with two main features:
1. Dynamic task scheduler + task graph execution (in parallel where possible, similar to OpenMP?)
2. “Big Data” collections like parallel arrays, dataframes, and lists that extend NumPy and Pandas to larger-than-memory or distributed environments


What can you use it for? (I need it for point two):
1. Make your numpy (+ numba) code parallel with minimal adjustments
2. Work on datasets larger than memory


# 1. Parallelize code with dask delayed

In [None]:
import dask
from distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)
client



In [None]:
import numpy as np
from dask import delayed, compute
from time import sleep

@delayed
def increase(x):
    sleep(0.0)
    y = x + 1
    return y

@delayed
def square(x):
    sleep(0.0)
    y = x**2
    return y

@delayed
def summed(l):
    sleep(0.0)
    y = np.sum(l)
    return y

squared = []
for x in range(5):
    
    x_inc = increase(x)
    x_square = square(x_inc)
    
    squared.append(x_square)

sum_of_squares = summed(squared)

#print(squared)
print(sum_of_squares)

In [None]:
sum_of_squares.visualize()

In [None]:
sum_of_squares.compute()

# 2. Dask Dataframes

In [None]:
# pandas reads everything into memory

import pandas as pd

n_obs = 1000
ages = np.round(np.random.uniform(20, 50, n_obs), 0)
skills = np.random.randn(n_obs)

df = pd.DataFrame({'age': ages, 'skill': skills})

df['wage'] = df['skill'] * 3 + df['age'] * 2 + np.random.randn(n_obs)

In [None]:
import dask.dataframe as dd

df = dd.from_pandas(df, npartitions=5)

In [None]:
df

## 2.1 Basic operations

In [None]:
df.head(5)

In [None]:
len(df)

In [None]:
df['age'].mean().compute()

In [None]:
df.std().compute()

## 2.2 More advanced operations

### 2.2.1 Groupby

In [None]:
df.groupby('age')[['skill', 'wage']].mean().compute()

### 2.2.2 Map your functions to each partition

In [None]:
def square(ddf):
    return ddf**2
    
df['age_squared'] = df['age'].map_partitions(square)

In [None]:
df.visualize()

In [None]:
df.head(5)

### 2.2.3 Save large files efficiently

Problem: the amount of tasks in the graph quickly becomes large making it slow

Save steps to disk

In [None]:
df.to_parquet('parquet', engine='pyarrow')

In [None]:
df = dd.read_parquet('parquet', engine='pyarrow')

In [None]:
df

### 2.2.4 Online Regression - Estimate a linear relationship in batches

Google does not update their models by recomputing their complete model when 1 new observation arrives...

https://en.wikipedia.org/wiki/Online_machine_learning

In [None]:
from dask_ml.wrappers import Incremental
from sklearn.linear_model import SGDRegressor

ols_sgd = Incremental(
    SGDRegressor(
        loss='squared_loss',
        penalty='none',
    )
)

https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.SGDRegressor.html#sklearn.linear_model.SGDRegressor

In [None]:
y = df['wage'].values
X = df[['age', 'skill']].values

In [None]:
y

In [None]:
X

In [None]:
results = ols_sgd.fit(X, y)

In [None]:
results

In [None]:
results.coef_

In [None]:
results.intercept_

In [None]:
prediction = results.predict(X)

In [None]:
prediction

In [None]:
prediction.compute()

In [None]:
len(prediction.compute())