# Dask Array

Материалы:
* Макрушин С.В. Лекция 11: Dask
* https://docs.dask.org/en/latest/array.html
* JESSE C. DANIEL. Data Science with Python and Dask. 

## Задачи для совместного разбора

In [9]:
import numpy as np
import dask
import dask.threaded as dthr
from dask.threaded import get
from dask.delayed import Delayed
import dask.array as da

1. Создайте массив размерностью 1000 на 300000, заполненный числами из стандартного нормального распределения. Исследуйте основные характеристики полученного массива.

In [10]:
arr = np.random.standard_normal((1000, 300000))

mean = np.mean(arr)
std_dev = np.std(arr)
min_val = np.min(arr)
max_val = np.max(arr)

print("медиана:", mean)
print("стандартное отклонение:", std_dev)
print("минимальное значение:", min_val)
print("максимальное значение:", max_val)

медиана: 1.3740721700036527e-05
стандартное отклонение: 0.9999551684850614
минимальное значение: -5.729057125528079
максимальное значение: 6.088302162511303


2. Посчитайте сумму квадратов элементов массива, созданного в задаче 1. Создайте массив `np.array` такого же размера и сравните скорость решения задачи с использование `da.array` и `np.array`

In [11]:
import numpy as np

arr = np.random.standard_normal((1000, 300000))

sum_of_squares = np.sum(np.square(arr))

print("Сумма квадратов элементов в массиве:", sum_of_squares)

Сумма квадратов элементов в массиве: 299974688.73309314


In [12]:
import dask.array as da


arr_dask = da.random.standard_normal((1000, 300000))

sum_of_squares_dask = da.sum(da.square(arr_dask)).compute()

print("Сумма квадратов элементов в массиве dask:", sum_of_squares_dask)

Сумма квадратов элементов в массиве dask: 300027689.8186557


In [13]:


import timeit
import numpy as np
import dask.array as da

arr_np = np.random.standard_normal((1000, 300000))
arr_dask = da.random.standard_normal((1000, 300000))
time_np = timeit.timeit(lambda: np.sum(np.square(arr_np)), number=0)
time_dask = timeit.timeit(lambda: np.sum(np.square(arr_dask)), number=0)
print(time_np)
print(time_dask)

3.6099999078942346e-07
2.0599998151737964e-07


3. Визуализируйте граф вычислений для задачи 12.

In [None]:
da.sum(arr**2).visualize()

## Лабораторная работа 11

In [22]:
import dask.array as da
import h5py
import numpy as np

1. Считайте датасет `recipe` из файла `minutes_n_ingredients_full.hdf5` в виде `dask.array`. Укажите аргумент `chunks=(100_000, 3)` при создании массива. Выведите на экран основную информацию о массиве.

In [23]:
recipe = da.from_array(h5py.File("minutes_n_ingredients_full.hdf5", 'r')['recipe'], chunks=(100_000, 3))
recipe
print("Привезенцев Семен")

Привезенцев Семен


In [24]:
print("Shape =",recipe.shape)
print("Size =",recipe.size)
print("Max =",recipe.max().compute())
print("Min =",recipe.min().compute())
print("Mean =",recipe.mean().compute())
print("Привезенцев Семен")

Shape = (2231637, 3)
Size = 6694911
Max = 2147483647
Min = 0
Mean = 375950.1723050538
Привезенцев Семен


2. Вычислите среднее значение по каждому столбцу, кроме первого. 

In [25]:
%%time
print(recipe[:, 1:].mean(axis=0).compute())
print("Привезенцев Семен")

[1004.20805176    5.4198008 ]
Привезенцев Семен
CPU times: user 91.9 ms, sys: 28.3 ms, total: 120 ms
Wall time: 53.2 ms


3. Исследуйте, как влияет значение аргумента `chunks` при создании `dask.array` на скорость выполнения операции поиска среднего. 

In [26]:
recipe_2 = da.from_array(h5py.File("minutes_n_ingredients_full.hdf5", 'r')['/recipe'], chunks=100)
recipe_3 = da.from_array(h5py.File("minutes_n_ingredients_full.hdf5", 'r')['/recipe'], chunks=99999999)
recipe_4 = da.from_array(h5py.File("minutes_n_ingredients_full.hdf5", 'r')['/recipe'], chunks=(10000, 10000))
print("Привезенцев Семен")

Привезенцев Семен


In [28]:
%%time
print(recipe_2[:, 1:].mean(axis=0).compute())
print("Привезенцев Семен")

[1004.20805176    5.4198008 ]
Привезенцев Семен
CPU times: user 13.1 s, sys: 1.86 s, total: 14.9 s
Wall time: 13.7 s


In [29]:
%%time
print(recipe_3[:, 1:].mean(axis=0).compute())
print("Привезенцев Семен")

[1004.20805176    5.4198008 ]
Привезенцев Семен
CPU times: user 56.4 ms, sys: 44 ms, total: 100 ms
Wall time: 98.9 ms


In [30]:
%%time
print(recipe_4[:, 1:].mean(axis=0).compute())
print("Привезенцев Семен")

[1004.20805176    5.4198008 ]
Привезенцев Семен
CPU times: user 178 ms, sys: 34.5 ms, total: 212 ms
Wall time: 143 ms


Чем больше чанков, тем быстрее выполняется код

4. Выберите рецепты, время выполнения которых меньше медианного значения

In [31]:
recipe[recipe[:, 1] < da.median(recipe[:, 1], axis=0).compute()].compute()
print("Привезенцев Семен")

Привезенцев Семен


5. Посчитайте количество каждого из возможных значений кол-ва ингредиентов

In [33]:
import pandas as pd

In [34]:
value, counts = da.unique(recipe[:, 2], return_counts=True)
display(pd.DataFrame({"value": value.compute(), "counts": counts.compute()}).set_index("value"))
print("Привезенцев Семен")

Unnamed: 0_level_0,counts
value,Unnamed: 1_level_1
1,222071
2,224158
3,229388
4,234948
5,240720
6,244360
7,247181
8,246747
9,246816
10,22430


Привезенцев Семен


6. Найдите максимальную продолжительность рецепта. Ограничьте максимальную продолжительность рецептов сверху значением, равному 75% квантилю.

In [35]:
print("Максимальная продолжительность рецепта =",da.max(recipe[:, 1]).compute())
print("Привезенцев Семен")

Максимальная продолжительность рецепта = 2147483647
Привезенцев Семен


In [37]:
recipe_75 = da.where(recipe[:, 1] > np.quantile(recipe[:, 1].compute(), 0.75), np.quantile(recipe[:, 1].compute(), 0.75), recipe[:, 1]).compute()

In [38]:
print("Максимальная продолжительность рецепта c ограничением =",da.max(recipe_75).compute())
print("Привезенцев Семен")

Максимальная продолжительность рецепта c ограничением = 49.0
Привезенцев Семен


7. Создайте массив `dask.array` из 2 чисел, содержащих ваши предпочтения относительно времени выполнения рецепта и кол-ва ингредиентов. Найдите наиболее похожий (в смысле $L_1$) рецепт из имеющихся в датасете.

In [39]:
arr = da.array([30, 5])
MAE = (abs(recipe[:, 1:].compute()[:, 0] - arr.compute()[0]) + abs(recipe[:, 1:].compute()[:, 1] - arr.compute()[1]))
print(recipe[np.argmin(MAE)].compute())
print("Привезенцев Семен")

[442705     30      5]
Привезенцев Семен


8. Работая с исходным файлом в формате `hdf5`, реализуйте алгоритм подсчета среднего значения в блочной форме и вычислите с его помощью среднее значение второго столбца в массиве.

Блочный алгоритм вычислений состоит из двух частей:
1. Загрузка фрагмента за фрагментом данных по `blocksize` элементов и проведение вычислений на этим фрагментом
2. Агрегация результатов вычислений на различных фрагментах для получения результата на уровне всего набора данных

Важно: при работе с `h5py` в память загружаются не все элементы, а только те, которые запрашиваются в данный момент

In [40]:
def blocksize():
    blocksize = 40000
    with h5py.File("minutes_n_ingredients_full.hdf5", 'r') as f:
        time = f["recipe"][:, 1]
        summa = 0
        count = 0
        for i in range(0, time.shape[0], blocksize):
            epoch = time[i: i+blocksize]
            summa += epoch.sum()
            count += epoch.shape[0]
    print("Среднее значение =",summa/count)
    print("Привезенцев Семен")

In [41]:
%%time
blocksize()
print("Привезенцев Семен")

Среднее значение = 1004.2080517575215
Привезенцев Семен
Привезенцев Семен
CPU times: user 29.1 ms, sys: 13.9 ms, total: 43 ms
Wall time: 40.1 ms
