<center>
<img src="../../img/ods_stickers.jpg">
## Открытый курс по машинному обучению, 3 сессия
<center>Автор материала: Трофимов Артём Владимирович, @avt

# <center>Dask: когда не справляется Pandas</center>

Данный тьюториал содержит краткий обзор библиотеки Dask и более подробное описание возможностей dask.dataframe.
<br>
При подготовке тьюториала использовались данные [2017 NYC Taxi Rides](http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml).

### Что такое Dask?

**Dask - библиотека Python для параллельных вычислений.** Работает как на одной машине, максимально используя доступные вычислительные ресурсы, так и на кластере до 1000 ядер. Однако, как заметил разработчик Dask Matthew Rocklin: "Медианный размер кластера Dask - 1 компьютер".

### Компоненты Dask

1. **Big data collections** - параллельные "ленивые" обёртки для датафреймов Pandas, массивов NumPy и итераторов для работы с данными, размер которых превышает объем памяти.
2. **Dynamic task scheduling** - планировщик задач, оптимизированный для вычислений.

<center><img src="http://dask.pydata.org/en/latest/_images/collections-schedulers.png" height="30%" widht="30%"></center>

**В отдельные проекты выделены:**
1. [Dask-ML](http://dask.pydata.org/en/latest/machine-learning.html) - оптимизированные алгоритмы sklearn, dask-xgboost (!), dask-tensorflow (!) и про "это ваше машинное обучение" в масштабах кластера. 
2. [Dask-distributed](https://distributed.readthedocs.io/en/latest/) - про dask на распределенном кластере

### К теме тьюториала: использование dask.dataframe

Dask.dataframe - это распределенный pandas.DataFrame. Если Dask.dataframe не помещается в память, то в RAM последовательно подгружаются соответствующие объему памяти части, а "излишки" хранятся на диске.

### Какие проблемы pandas решает dask.dataframe?

**Проблема №1**: данные должны помещаться в память
<br>
**Решение Dask**: работает с данными, которые не умещаются в память
<br><br>
**Проблема №2**: вычисления в 1 поток
<br>
**Решение Dask**: автоматическая параллелизация

<img src="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg" height="20%" width="20%">

**Интерфейс dask.dataframe аналогичен pandas:**

```
#pandas                                 #dask
import pandas as pd                     import dask.dataframe as dd
df = pd.read_csv('2015-01-01.csv')      df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean()     df.groupby(df.user_id).value.mean().compute()
```

### Эксперименты

In [None]:
import gc
import glob

import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd

**Имеется 2 файла:**

In [None]:
!ls data/*.csv

**Считывать будем только 4 столбца:** `VendorID`, `tpep_pickup_datetime`, `passenger_count`, `total_amount`
<br>
Этих данных достаточно для демонстрации возможностей Dask <s>да и комп у меня слабый</s> :trollface:
<br><br>
Параметры для считывания файлов:

In [None]:
params = dict(header=0, 
              usecols = [0, 1, 3, 16],
              dtype = {'1': 'datetime64'},
              #небольшой костыль для корректного считывания данных
              converters = {'Passenger_count': (lambda x: round(float(x), 0) // 1 if (x != 'NaN' or len(x) <= 5) else 0), 
                            'Total_amount': (lambda x: float(x) if (x != 'NaN' or len(x) <= 5) else 0)}
             )

### Читаем 1 файл

**pandas**

In [None]:
%%time
pandas_df = pd.read_csv('data/yellow_tripdata_2017-12.csv', **params)
pandas_df.head()

**dask**

In [None]:
%%time
dask_df = dd.read_csv('data/yellow_tripdata_2017-12.csv', **params)
dask_df.head()

>Dask справился значительно быстрее, потому что pandas сначала считывает файл и выводит первые 5, а dask считывает 5 строк и сразу их выводит.

Однако, когда файл помещается в оперативную память, pandas с уже загруженными данными серьезно превосходит dask, работающий по "ленивому" принципу - вычисления и обработка данных происходят непосредственно при вызове метода. Реализация "ленивого" подхода, в принципе, характерна для ресурсоемких операций. Особенно, когда дело касается "настоящей бигдаты".

**Следим за использованием памяти, удаляем ненужные объекты, собираем мусор:**

In [None]:
del pandas_df, dask_df
gc.collect()

### Загружаем 2 файла

**dask**

In [None]:
%%time
dask_df2 = dd.read_csv('data/*.csv', **params)
dask_df2.head()

**pandas**

In [None]:
%%time
pandas_df2 = pd.concat([pd.read_csv(fn, **params) for fn in glob.glob('data/*.csv')])
pandas_df2.head()

>Учитывая, что загружаемые файлы примерно одинакового размера (~800 Mb), видим, что время обработки увеличилось линейно. Очевидно, если грузить реально большой файл(-ы), pandas рано или поздно упрётся в лимит RAM.

### OK - памяти хватает, но считает медленно...

Для устранения этого неудобства можно просто преобразовать pandas.DataFrame в dask.datafram и считать всеми имеющимися ядрами. Автоматически, без дополнительного кода и настроек.

**Используем pandas_df2 из предыдущего примера:**

In [None]:
%%time
dask_df3 = dd.from_pandas(pandas_df2, npartitions=2, chunksize=None)

**pandas'овский датафрейм просто переопределим для нумерации датафреймов:**

In [None]:
%%time
pandas_df3 = pandas_df2

**Уборка:**

In [None]:
del pandas_df2
gc.collect()

### Speed-test: dask VS. pandas

Рассмотрим несколько примеров, наглядно демонстрирующих: с помощью dask можно значительно ускорить обработку данных.
<br><br>
Обратите внимание на метод `compute()` при обработке dask датафрейма - это как раз команда "посчитать". Без нее "ленивый" dask лишь определит, что нужно будет сделать непосредственно при запросе пользователя.

#### 1. max()

In [None]:
%%time
pandas_df3['total_amount'].max()

In [None]:
%%time
dask_df3['total_amount'].max().compute()

#### 2. value_counts()

In [None]:
%%time
pandas_df3['passenger_count'].value_counts()

In [None]:
%%time
pandas_df3['passenger_count'].value_counts()

#### 3. groupby() - sum()

In [None]:
%%time
pandas_df3.groupby(by='VendorID')['passenger_count'].sum()

In [None]:
%%time
dask_df3.groupby(by='VendorID')['passenger_count'].sum().compute()

>Очевидно, dask, автоматически используя доступные ресурсы, работает быстрее pandas даже при простых операциях.

Dask.dataframe API является частью Pandas API, но не является его полной копией - следует знать о некоторых ограничениях, например:
1. Операции, связанные с индексированием (новый индекс) несортированных данных, затратны с вычислительной точки зрения
2. Посторочная обработка работает медленно как в pandas, так и в dask

### Выводы

Dask - простой и мощный инструмент для чтения больших файлов и обработки данных. Использвание dask.dataframe позволяет максимально использовать ресурсы компьютера без дополнительного кода и настроек.

**dask.dataframe <font color="green">рекомендуется</font> использовать, когда:**
1. Необходимо считать и обработать данные, не помещающиеся в память
2. Конфигурация компьютера позволяет задействовать в вычислениях несколько ядер процессора
3. Распределенная обработка больших датасетов с помощью стандартных инструмнтов Pandas

**Использование dask.dataframe <font color="red">не рекомендуется</font>, когда:**
1. Данные помещаются в память - pandas может справляться быстрее
2. Данные не соответствуют табличному формату pandas
3. Необходимо использование функционала, не реализованного в dask.dataframe API

### Источники информации:

1. Очень крутая и подробная [документация Dask](http://dask.pydata.org/en/latest/docs.html)
2. Презентация Matthew Rocklin, разработчика Dask - [Dask: Parallel Programming in Python](http://matthewrocklin.com/slides/dask-short.html)
3. Материалы митапа "Машинное обучение в Новосибирске" - [Дмитрий Колодезев о Dask](https://www.youtube.com/watch?time_continue=193&v=emd2NOC05es)