# Dask Array

Материалы:
* Макрушин С.В. Лекция 11: Dask
* https://docs.dask.org/en/latest/array.html
* JESSE C. DANIEL. Data Science with Python and Dask. 

In [1]:
from dask import dataframe as dd
from time import time
import numpy as np
import dask
import dask.array as da
import h5py

In [2]:
path1 = "D:/FinUniver/Технологии обработки больших данных/Семинары/11_dask_array/11_dask_array_data/"

In [3]:
dask.__version__

'2.30.0'

## Задачи для совместного разбора

1. Создайте массив размерностью 1000 на 300000, заполненный числами из стандартного нормального распределения. Исследуйте основные характеристики полученного массива.

In [7]:
%%time 
arr_np = np.random.normal(0, 1, size=(1000, 300000))

Wall time: 12 s


In [4]:
%%time 
arr_da = da.random.normal(0, 1, size=(1000, 300000), chunks=(1000, 30000))
arr_da

Wall time: 1.99 ms


Unnamed: 0,Array,Chunk
Bytes,2.40 GB,240.00 MB
Shape,"(1000, 300000)","(1000, 30000)"
Count,10 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.40 GB 240.00 MB Shape (1000, 300000) (1000, 30000) Count 10 Tasks 10 Chunks Type float64 numpy.ndarray",300000  1000,

Unnamed: 0,Array,Chunk
Bytes,2.40 GB,240.00 MB
Shape,"(1000, 300000)","(1000, 30000)"
Count,10 Tasks,10 Chunks
Type,float64,numpy.ndarray


2. Посчитайте сумму квадратов элементов массива, созданного в задаче 1. Создайте массив `np.array` такого же размера и сравните скорость решения задачи с использование `da.array` и `np.array`

In [8]:
%%time
np.power(arr_np, 2).sum()

Wall time: 32.6 s


300030407.73229486

In [33]:
type(arr_da)

dask.array.core.Array

In [5]:
%%time
da.power(arr_da, 2).sum().compute()

Wall time: 13.7 s


300047323.32766014

3. Визуализируйте граф вычислений для задачи 2.

In [9]:
da.power(arr_da, 2).sum().visualize()

RuntimeError: Drawing dask graphs requires the `graphviz` python library and the `graphviz` system library to be installed.

## Лабораторная работа 11

1. Считайте датасет `recipe` из файла `minutes_n_ingredients_full.hdf5` в виде `dask.array`. Укажите аргумент `chunks=(100_000, 3)` при создании массива. Выведите на экран основную информацию о массиве.

In [4]:
with h5py.File(path1 + "minutes_n_ingredients_full.hdf5", 'r') as hdf:
    ds1 = hdf["recipe"]
    description = ds1.attrs['description']
    da_recipe = da.from_array(ds1[:], chunks=(100000, 3))
da_recipe

Unnamed: 0,Array,Chunk
Bytes,53.56 MB,2.40 MB
Shape,"(2231637, 3)","(100000, 3)"
Count,23 Tasks,23 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 53.56 MB 2.40 MB Shape (2231637, 3) (100000, 3) Count 23 Tasks 23 Chunks Type int64 numpy.ndarray",3  2231637,

Unnamed: 0,Array,Chunk
Bytes,53.56 MB,2.40 MB
Shape,"(2231637, 3)","(100000, 3)"
Count,23 Tasks,23 Chunks
Type,int64,numpy.ndarray


In [5]:
description

'Содержит столбцы id, minutes и n_ingredients из recipes_full.csv'

2. Вычислите среднее значение по каждому столбцу, кроме первого. 

In [6]:
da_recipe[:, 1:].mean(axis=0).compute()

array([1004.20805176,    5.4198008 ])

3. Исследуйте, как влияет значение аргумента `chunks` при создании `dask.array` на скорость выполнения операции поиска среднего. 

In [7]:
def calc_dask_time(chunk: tuple):
    with h5py.File(path1 + "minutes_n_ingredients_full.hdf5", 'r') as hdf:
        ds1 = hdf["recipe"]
        da_buf = da.from_array(ds1[:], chunks=chunk)
    timer = time()
    da_buf[:, 1:].mean(axis=0).compute()
    return time() - timer

In [8]:
calc_dask_time((50000, 3))

0.031243324279785156

In [9]:
calc_dask_time((150000, 3))

0.03124523162841797

In [10]:
calc_dask_time((200000, 3))

0.031243562698364258

In [13]:
calc_dask_time((300000, 3))

0.03124713897705078

4. Выберите рецепты, время выполнения которых меньше медианного значения

In [14]:
da_recipe[da_recipe[:, 1] < da.median(da_recipe[:, 1], axis=0)].compute()

array([[1089012,      23,       5],
       [1428572,       0,       5],
       [1400250,      24,       1],
       ...,
       [1029131,      19,       4],
       [1700703,       1,       1],
       [ 713836,       0,       9]], dtype=int64)

5. Посчитайте количество каждого из возможных значений кол-ва ингредиентов

In [15]:
df = dd.from_array(da_recipe)
df.groupby(by=2)[2].count().compute()

2
1     222071
2     224158
3     229388
4     234948
5     240720
6     244360
7     247181
8     246747
9     246816
10     22430
11     19094
12     15165
13     11640
14      8284
15      6014
16      4145
17      2793
18      1913
19      1279
20       852
21       529
22       346
23       244
24       178
25       107
26        68
27        55
28        33
29        22
31        13
32         5
35         4
33         4
43         1
39         1
37         2
30        20
40         2
34         3
38         1
36         1
Name: 2, dtype: int64

6. Найдите максимальную продолжительность рецепта. Ограничьте максимальную продолжительность рецептов сверху значением, равному 75% квантилю.

In [16]:
df[df[1] < df[1].quantile(0.75)][1].max().compute()

48

7. Создайте массив `dask.array` из 2 чисел, содержащих ваши предпочтения относительно времени выполнения рецепта и кол-ва ингредиентов. Найдите наиболее похожий (в смысле $L_1$) рецепт из имеющихся в датасете.

In [20]:
da_like_recipe = da.array([4563, 42])

da_recipe[(da.power(da_recipe[:, [1, 2]] - da_like_recipe, 2)).mean(axis=1).argmin()].compute()

array([225084,   4560,     17], dtype=int64)

8. Работая с исходным файлом в формате `hdf5`, реализуйте алгоритм подсчета среднего значения в блочной форме и вычислите с его помощью среднее значение второго столбца в массиве.

Блочный алгоритм вычислений состоит из двух частей:
1. загрузка фрагмента за фрагментом данных по `blocksize` элементов и проведение вычислений на этим фрагментом;
2. агрегация результатов вычислений на различных фрагментах для получения результата на уровне всего набора данных.

Важно: при работе с `h5py` в память загружаются не все элементы, а только те, которые запрашиваются в данный момент

In [142]:
def aver_block(file: str, column_number: int) -> float:
    blocksize = 339
    arr_mean = []
    with h5py.File(file, 'r') as hdf:
        ds1 = hdf["recipe"]
        for i in range(0, len(ds1), blocksize):
            da_arr = da.from_array(ds1[i:i+blocksize], chunks=(blocksize, 3))
            arr_mean.append(da_arr.mean(axis=0)[column_number])
    res = da.vstack(arr_mean)
    return (res.sum() / len(res)).compute()

In [143]:
aver_block(path1 + "minutes_n_ingredients_full.hdf5", 1)

1004.2080517575213