# <center>Dask: когда не справляется Pandas</center>

# Используйте NumPy 
- Если ваши данные удобно помещаются в ОЗУ и вы не ограничены в производительности, то использование NumPy может быть правильным выбором. Dask добавляет еще один уровень сложности, который может помешать. Если вы просто ищете ускорение, а не масштабируемость, вы можете рассмотреть такой проект, как Numba. [Подробнее про лучшие практики](https://docs.dask.org/en/stable/array-best-practices.html)

Данный тьюториал содержит краткий обзор библиотеки Dask и более подробное описание возможностей dask.dataframe.
<br>
При подготовке тьюториала использовались данные [2017 NYC Taxi Rides](http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml).

### Что такое Dask?

In [None]:
#https://docs.dask.org/en/stable/

**Dask - библиотека Python для параллельных вычислений.** Работает как на одной машине, максимально используя доступные вычислительные ресурсы, так и на кластере до 1000 ядер. Однако, как заметил разработчик Dask Matthew Rocklin: "Медианный размер кластера Dask - 1 компьютер".

### Компоненты Dask

1. **Big data collections** - параллельные "ленивые" обёртки для датафреймов Pandas, массивов NumPy и итераторов для работы с данными, размер которых превышает объем памяти.
2. **Dynamic task scheduling** - планировщик задач, оптимизированный для вычислений.

<center><img src="https://docs.dask.org/en/stable/_images/dask-overview.svg" height="30%" widht="30%"></center>

**В отдельные проекты выделены:**
1. [Dask-ML](https://ml.dask.org/) - оптимизированные алгоритмы sklearn, dask-xgboost (!), dask-tensorflow (!) и про "это ваше машинное обучение" в масштабах кластера. 
2. [Dask-distributed](https://distributed.readthedocs.io/en/latest/) - про dask на распределенном кластере

### К теме тьюториала: использование dask.dataframe

Dask.dataframe - это распределенный pandas.DataFrame. Если Dask.dataframe не помещается в память, то в RAM последовательно подгружаются соответствующие объему памяти части, а "излишки" хранятся на диске.

### Какие проблемы pandas решает dask.dataframe?

**Проблема №1**: данные должны помещаться в память
<br>
**Решение Dask**: работает с данными, которые не умещаются в память
<br><br>
**Проблема №2**: вычисления в 1 поток
<br>
**Решение Dask**: автоматическая параллелизация

<img src="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg" height="20%" width="20%">

**Интерфейс dask.dataframe аналогичен pandas:**

```
#pandas                                 #dask
import pandas as pd                     import dask.dataframe as dd
df = pd.read_csv('2015-01-01.csv')      df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean()     df.groupby(df.user_id).value.mean().compute()
```

### Эксперименты

In [1]:
import pandas as pd
import numpy as np
import glob
import dask
import dask.dataframe as dd
import gc

**Имеется 2 файла:**

In [2]:
#!ls data/*.csv

**Считывать будем только 4 столбца:** `VendorID`, `tpep_pickup_datetime`, `passenger_count`, `total_amount`
<br>
Этих данных достаточно для демонстрации возможностей Dask <s>да и комп у меня слабый</s> :trollface:
<br><br>
Параметры для считывания файлов:

In [2]:
params = dict(header=0, 
              usecols = [0, 1, 3, 16],
              dtype = {'1': 'datetime64'},
              #небольшой костыль для корректного считывания данных
              converters = {'Passenger_count': (lambda x: round(float(x), 0) // 1 if (x != 'NaN' or len(x) <= 5) else 0), 
                            'Total_amount': (lambda x: float(x) if (x != 'NaN' or len(x) <= 5) else 0)}
             )

### Читаем 1 файл

**pandas**

In [4]:
#https://data.cityofnewyork.us/Transportation/2017-Yellow-Taxi-Trip-Data/biws-g3hs данные тут

In [20]:
%%time
pandas_df = pd.read_csv('2017_Yellow_Taxi_Trip_Data.csv', **params)
pandas_df.tail()

  exec(code, glob, local_ns)


CPU times: total: 13.2 s
Wall time: 13.5 s


Unnamed: 0,VendorID,tpep_pickup_datetime,passenger_count,total_amount
16200443,2,05/05{,,
16200444,"""error"" : true",,,
16200445,"""message"" : ""Internal error""",,,
16200446,"""status"" : 500",,,
16200447,},,,


In [22]:
pandas_df.passenger_coun = pandas_df.passenger_count.dropna()

  pandas_df.passenger_coun = pandas_df.passenger_count.dropna()


In [None]:
pandas_df.tail()

In [24]:
pandas_df.passenger_count.mean()

1.6206014860210922

**dask**

In [3]:
%%time
dask_df = dd.read_csv('2017_Yellow_Taxi_Trip_Data.csv', **params, assume_missing=True)
dask_df.head()

CPU times: total: 625 ms
Wall time: 3.6 s


Unnamed: 0,VendorID,tpep_pickup_datetime,passenger_count,total_amount
0,2.0,03/17/2017 12:19:39 AM,1.0,12.3
1,2.0,03/17/2017 12:19:39 AM,1.0,6.62
2,2.0,03/17/2017 12:19:39 AM,5.0,5.3
3,2.0,03/17/2017 12:19:39 AM,1.0,11.76
4,2.0,03/17/2017 12:19:40 AM,1.0,17.76


In [19]:
dask_df.passenger_count.mean().compute()

  df = pandas_read_text(


1.6206014860210922

In [13]:
#dask_df.tail()


>Dask справился значительно быстрее, потому что pandas сначала считывает файл и выводит первые 5, а dask считывает 5 строк и сразу их выводит.

Однако, когда файл помещается в оперативную память, pandas с уже загруженными данными серьезно превосходит dask, работающий по "ленивому" принципу - вычисления и обработка данных происходят непосредственно при вызове метода. Реализация "ленивого" подхода, в принципе, характерна для ресурсоемких операций. Особенно, когда дело касается "настоящей бигдаты".

**Следим за использованием памяти, удаляем ненужные объекты, собираем мусор:**

In [14]:
#del pandas_df, dask_df
gc.collect()

1641

### Загружаем 2 файла

**dask**

In [7]:
# %%time
# dask_df2 = dd.read_csv('data/*.csv', **params)
# dask_df2.head()

CPU times: user 984 ms, sys: 132 ms, total: 1.12 s
Wall time: 2.46 s


**pandas**

In [8]:
# %%time
# pandas_df2 = pd.concat([pd.read_csv(fn, **params) for fn in glob.glob('data/*.csv')])
# pandas_df2.head()

CPU times: user 25.9 s, sys: 2.41 s, total: 28.3 s
Wall time: 38.4 s


>Учитывая, что загружаемые файлы примерно одинакового размера (~800 Mb), видим, что время обработки увеличилось линейно. Очевидно, если грузить реально большой файл(-ы), pandas рано или поздно упрётся в лимит RAM.

### OK - памяти хватает, но считает медленно...

Для устранения этого неудобства можно просто преобразовать pandas.DataFrame в dask.datafram и считать всеми имеющимися ядрами. Автоматически, без дополнительного кода и настроек.

**Используем pandas_df2 из предыдущего примера:**

In [25]:
%%time
dask_df3 = dd.from_pandas(pandas_df, npartitions=10, chunksize=None)

CPU times: total: 3.06 s
Wall time: 3.1 s


**pandas'овский датафрейм просто переопределим для нумерации датафреймов:**

In [26]:
%%time
pandas_df3 = pandas_df

CPU times: total: 0 ns
Wall time: 0 ns


**Уборка:**

In [27]:
#del pandas_df2
gc.collect()

1945

### Speed-test: dask VS. pandas

Рассмотрим несколько примеров, наглядно демонстрирующих: с помощью dask можно значительно ускорить обработку данных.
<br><br>
Обратите внимание на метод `compute()` при обработке dask датафрейма - это как раз команда "посчитать". Без нее "ленивый" dask лишь определит, что нужно будет сделать непосредственно при запросе пользователя.

#### 1. max()

In [28]:
%%time
pandas_df3['total_amount'].max()

CPU times: total: 15.6 ms
Wall time: 18 ms


538482.68

In [29]:
%%time
dask_df3['total_amount'].max().compute()

CPU times: total: 31.2 ms
Wall time: 14 ms


538482.68

#### 2. value_counts()

In [34]:
%%time
pandas_df3['passenger_count'].value_counts()

CPU times: total: 188 ms
Wall time: 181 ms


1.0    11647707
2.0     2328532
5.0      771051
3.0      660530
6.0      474534
4.0      316162
0.0        1801
8.0          50
7.0          45
9.0          31
Name: passenger_count, dtype: int64

In [35]:
%%time
dask_df3['passenger_count'].value_counts().compute()

CPU times: total: 234 ms
Wall time: 243 ms


1.0    11647707
2.0     2328532
5.0      771051
3.0      660530
6.0      474534
4.0      316162
0.0        1801
8.0          50
7.0          45
9.0          31
Name: passenger_count, dtype: int64

#### 3. groupby() - sum()

In [36]:
%%time
pandas_df3.groupby(by='VendorID')['passenger_count'].sum()

CPU times: total: 969 ms
Wall time: 963 ms


VendorID
1                                  9259919.0
2                                 16973998.0
  "error" : true                         0.0
  "message" : "Internal error"           0.0
  "status" : 500                         0.0
1                                     7459.0
2                                    13086.0
}                                        0.0
Name: passenger_count, dtype: float64

In [37]:
%%time
dask_df3.groupby(by='VendorID')['passenger_count'].sum().compute()

CPU times: total: 969 ms
Wall time: 774 ms


VendorID
1                                  9259919.0
2                                 16973998.0
  "error" : true                         0.0
  "message" : "Internal error"           0.0
  "status" : 500                         0.0
1                                     7459.0
2                                    13086.0
}                                        0.0
Name: passenger_count, dtype: float64

>Очевидно, dask, автоматически используя доступные ресурсы, работает быстрее pandas даже при простых операциях.

Dask.dataframe API является частью Pandas API, но не является его полной копией - следует знать о некоторых ограничениях, например:
1. Операции, связанные с индексированием (новый индекс) несортированных данных, затратны с вычислительной точки зрения
2. Посторочная обработка работает медленно как в pandas, так и в dask

### Выводы

Dask - простой и мощный инструмент для чтения больших файлов и обработки данных. Использвание dask.dataframe позволяет максимально использовать ресурсы компьютера без дополнительного кода и настроек.

**dask.dataframe <font color="green">рекомендуется</font> использовать, когда:**
1. Необходимо считать и обработать данные, не помещающиеся в память
2. Конфигурация компьютера позволяет задействовать в вычислениях несколько ядер процессора
3. Распределенная обработка больших датасетов с помощью стандартных инструмнтов Pandas

**Использование dask.dataframe <font color="red">не рекомендуется</font>, когда:**
1. Данные помещаются в память - pandas может справляться быстрее
2. Данные не соответствуют табличному формату pandas
3. Необходимо использование функционала, не реализованного в dask.dataframe API

### Источники информации:

1. Очень крутая и подробная [документация Dask](http://dask.pydata.org/en/latest/docs.html)
2. Презентация Matthew Rocklin, разработчика Dask - [Dask: Parallel Programming in Python](http://matthewrocklin.com/slides/dask-short.html)
3. Материалы митапа "Машинное обучение в Новосибирске" - [Дмитрий Колодезев о Dask](https://www.youtube.com/watch?time_continue=193&v=emd2NOC05es)

In [29]:
params = dict(header=0, 
              usecols = [0, 1, 3, 16],
              dtype = {'1': 'datetime64','VendorID': 'int64'},
              #небольшой костыль для корректного считывания данных
              converters = {'Passenger_count': (lambda x: round(float(x), 0) // 1 if (x != 'NaN' or len(x) <= 5) else 0), 
                            'Total_amount': (lambda x: float(x) if (x != 'NaN' or len(x) <= 5) else 0),
                            'VendorID': (lambda x: int(x) if (x != 'NaN' or len(x) <= 5) else 0)}
             )

In [30]:
import dask.dataframe as dd

# Dask dataframe
ddf = dd.read_csv("2017_Yellow_Taxi_Trip_Data.csv",**params)
print(ddf.head())



  return func(*args, **kwargs)


   VendorID    tpep_pickup_datetime  passenger_count  total_amount
0         2  03/17/2017 12:19:39 AM                1         12.30
1         2  03/17/2017 12:19:39 AM                1          6.62
2         2  03/17/2017 12:19:39 AM                5          5.30
3         2  03/17/2017 12:19:39 AM                1         11.76
4         2  03/17/2017 12:19:40 AM                1         17.76


In [31]:
ddf.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 4 entries, VendorID to total_amount
dtypes: object(1), float64(1), int64(2)

In [38]:
ddf.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 4 entries, VendorID to total_amount
dtypes: object(1), float64(1), int64(2)