# dask

From dask [best practices](https://docs.dask.org/en/latest/dataframe-best-practices.html) page:
> For data that fits into RAM, Pandas can often be faster and easier to use than Dask DataFrame. While “Big Data” tools can be exciting, they are almost always worse than normal data tools while those remain appropriate.

# Clusters and clients

[scheduling](https://docs.dask.org/en/latest/scheduling.html)

## Single machine scheduler

[docs](https://docs.dask.org/en/latest/setup/single-machine.html)

In [None]:
from time import sleep
import dask
from dask import delayed

@delayed
def inc(x):
    sleep(1)
    return x + 1

@delayed
def add(x, y):
    sleep(1)
    return x + y

z = add(inc(1), inc(2))

In [None]:
%%time
z.compute(scheduler='synchronous')

In [None]:
%%time
z.compute(scheduler='threads', num_workers=2)

In [None]:
%%time
z.compute(scheduler='processes', num_workers=2)

## Local cluster

In [None]:
import dask
from dask.distributed import Client, LocalCluster

- Synchronous: `processes=False, n_workers=1, threads_per_worker=1`
- Threads: `processes=False, n_workers=5, threads_per_worker=1`
- Processes: `processes=True, n_workers=5, threads_per_worker=1`

In [None]:
cluster = LocalCluster(processes=False, n_workers=1, threads_per_worker=1)
client = Client(cluster)

In [None]:
client.close()
cluster.close()

### Memory

In [None]:
import numpy as np

In [None]:
import dask.array as da

n = 10_000
x = da.random.random((n, n))
y = da.linalg.inv(x)
z = y.sum()

In [None]:
cluster = LocalCluster(processes=True, n_workers=2, threads_per_worker=1, memory_limit='10000MB')
client = Client(cluster)

In [None]:
cluster.workers[0].memory_limit

In [None]:
client.close()
cluster.close()

# Example

In [None]:
import numpy as np
import pandas as pd
import statsmodels.formula.api as smf
import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import dask.distributed

In [None]:
df = pd.read_parquet('/InfoGroup/data/processed/full.pq/', 'fastparquet', ['YEAR', 'EMPLOYEES', 'NAICS'], filters=[('STATE', '=', 'WI'), ('YEAR', 'in', [2000, 2001])])

In [None]:
client = Client()
client

In [None]:
df = dd.read_parquet('/InfoGroup/data/processed/full.pq/')
df = df[(df['STATE'] == 'WI') & (df['YEAR'].isin([2000, 2001]))]
df = client.persist(df)

In [None]:
x = df.shape
x[0].compute()

In [None]:
z = df.index.compute()

In [None]:
z

In [None]:
z.has_duplicates

In [None]:
z[-100:]