<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg" 
     width="30%" 
     align=right
     alt="Dask logo">

Custom Workflows
------------------

We submit tasks directly to the task scheduler.  This demonstrates the flexibility that can be achieved with the `submit` function and normal Python for loops.

Later on we map functions across Python queues to construct data processing pipelines.

In [None]:
from dask.distributed import Client, progress
c = Client('localhost:8786')
c

In [None]:
import dask.dataframe as dd

df = dd.demo.make_timeseries('2010', '2016',
                             {'value': float, 'name': str, 'id': int},
                             freq='10s', partition_freq='7d', seed=1)

df = df[df.value > 0][['id', 'value', 'name']]

df.head()

In [None]:
df = c.persist(df)
progress(df)

In [None]:
%time len(df)

In [None]:
%time df.groupby(df.id).value.mean().nlargest(10).compute()

### Timeseries operations

In [None]:
df.value.resample('1d').std().head()

### Visualize algorithms: Typical Groupby

In [None]:
df = dd.demo.make_timeseries('2010-01-01', '2010-12-31',
                             {'value': float, 'name': str, 'id': int},
                             freq='10s', partition_freq='1M', seed=1)

df.value.sum().visualize()

In [None]:
df.groupby(df.id).value.mean().visualize()

### Visualize algorithms: Datetime-resampling

In [None]:
df = dd.demo.make_timeseries('2010-01-01', '2010-12-31',
                             {'value': float, 'name': str, 'id': int},
                             freq='10s', partition_freq='1M', seed=1)

df.value.resample('1w').mean().visualize()

In [None]:
df.value.resample('1w').mean().compute()

### Visualize algorithms: Rolling-aggregation

In [None]:
df = dd.demo.make_timeseries('2010-01-01', '2010-08-30',
                             {'value': float, 'name': str, 'id': int},
                             freq='10s', partition_freq='1M', seed=1)


df.value.rolling(100).mean().visualize(rankdir='LR')

In [None]:
df.value.rolling(100).mean().head()