# Dask Array (1)

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Макрушин С.В. Лекция "Dask"
* https://docs.dask.org/en/latest/array.html
* https://docs.dask.org/en/stable/array-chunks.html
* https://en.wikipedia.org/wiki/Taxicab_geometry
* https://docs.h5py.org/en/stable/

## Задачи для совместного разбора

1. Создайте массив размерностью 1000 на 300000, заполненный числами из стандартного нормального распределения. Исследуйте основные характеристики полученного массива.

2. Посчитайте сумму квадратов элементов массива, созданного в задаче 1. Создайте массив `np.array` такого же размера и сравните скорость решения задачи с использование `da.array` и `np.array`

## Лабораторная работа 7

__При решении данных задач не подразумевается использования циклов или генераторов Python в ходе работы с пакетами `numpy`, `pandas` и `dask`, если в задании не сказано обратного. Решения задач, в которых для обработки массивов `numpy`, структур `pandas` или структур `dask` используются явные циклы (без согласования с преподавателем), могут быть признаны некорректными и не засчитаны.__

В ходе выполнения все операции вычислений (расчет средних значений, расчет косинусной близости и т.д.) проводятся над `dask.array` и средствами пакета `dask`, если в задании не сказано обратного. Переход от `dask.array` к `numpy.array` или `pd.DataFrame` возможен исключительно для демонстрации результата в конце решения задачи. Если в задаче используются результаты выполнения предыдущих задач, то подразумевается, что вы используете результаты в виде `dask.array` (то есть то, что было получено до вызова `compute`, а не после).

In [4]:
import dask.array as da
import h5py
import numpy as np
import pandas as pd
import dask

In [3]:
arr = da.random.standard_normal(size=(1000, 300_000))
arr.to_hdf5("./demo.hdf5", "/smart")

In [15]:
fp = h5py.File("./demo.hdf5", "r")
fp.keys()
smart = fp["smart"]
smart
fp.close()

<p class="task" id="1"></p>

1\. Считайте датасет `embeddings` из файла `recipe_embeddings.h5` в виде `dask.array`. Выведите на экран основную информацию о массиве: размер, количество векторов $M$ (количество строк в массиве), размерность каждого вектора $N$ (количество столбцов в массиве), тип, количество и размер сегментов. 

In [5]:
f1 = h5py.File("recipe_embeddings.h5", "r")

In [6]:
f1.keys()

<KeysViewHDF5 ['embeddings', 'mask']>

In [7]:
emb = da.from_array(f1['embeddings'])
emb

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,119.02 MiB
Shape,"(1200000, 312)","(100000, 312)"
Count,13 Tasks,12 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.39 GiB 119.02 MiB Shape (1200000, 312) (100000, 312) Count 13 Tasks 12 Chunks Type float32 numpy.ndarray",312  1200000,

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,119.02 MiB
Shape,"(1200000, 312)","(100000, 312)"
Count,13 Tasks,12 Chunks
Type,float32,numpy.ndarray


In [8]:
M, N = emb.shape 
M, N

(1200000, 312)

* **M = 120000**
* **N = 312**
* **тип - float32**
* **количество - 13**
* **размер сегментов - (100000, 312)**

<p class="task" id="2"></p>

2\. Посчитайте и выведите на экран среднее значение всех элементов массива. Исследуйте, как влияет значение аргумента `chunks` при создании `dask.array` на скорость выполнения операции поиска среднего. 

Пусть $M$ - количество строк в массиве, $N$ - количество столбцов в массиве, `chunks=(r,c)`. Сравните несколько вариантов:
* $r=M$, $с \ll N$ , 
* $r \ll M$, $c=N$ 
* $r = M$, $c = N$ 
* значения $r, c$ по умолчанию.

Выберите наиболее оптимальные значения $r$ и  $c$ в смысле скорости вычислений и далее продолжайте работу с ними.

In [9]:
M*0.08, N*0.08

(96000.0, 24.96)

In [10]:
%%time
emb2_1 = da.from_array(f1['embeddings'], chunks=(M,25))
mean1 = emb2_1.mean().compute()
mean1

Wall time: 6.82 s


0.002377755

In [11]:
%%time
emb2_2 = da.from_array(f1['embeddings'], chunks=(96_000,N))
mean2 = emb2_2.mean().compute()
mean2

Wall time: 550 ms


0.0023777564

In [12]:
%%time
emb2_3 = da.from_array(f1['embeddings'], chunks=(M,N))
mean3 = emb2_3.mean().compute()
mean3

Wall time: 723 ms


0.0023777678

**Наиболее оптимальные значения - (96 000, 312)**

<p class="task" id="3"></p>

3\. Опишите пространство, в котором расположены эмбеддинги, посчитав минимальное и максимальное значение для каждой из координат. Сведите результаты в таблицу `pd.DataFrame`, состоящую из двух строк и 312 столбцов. Задайте индексы строк "min" и "max". Названия столбцов сделайте вида $e_i$. Выведите полученную таблицу на экран.

Решите задачу двумя способами. В первом варианте сделайте два вызова метода `compute` для расчета каждого из векторов максимальных и минимальных значений. Во втором варианте сделайте один вызов функции `dask.compute` для одновременного расчета двух векторов. Сравните время выполнения двух решений.

In [13]:
emb2_2

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,114.26 MiB
Shape,"(1200000, 312)","(96000, 312)"
Count,14 Tasks,13 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.39 GiB 114.26 MiB Shape (1200000, 312) (96000, 312) Count 14 Tasks 13 Chunks Type float32 numpy.ndarray",312  1200000,

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,114.26 MiB
Shape,"(1200000, 312)","(96000, 312)"
Count,14 Tasks,13 Chunks
Type,float32,numpy.ndarray


In [14]:
%%time
mns = emb2_2.min(axis=0).compute()
mxs = emb2_2.max(axis=0).compute()
columns = [f'e_{i}' for i in range(312)]
indexes = ['min', 'max']
pd.DataFrame([mns,mxs], columns=columns, index=indexes)

Wall time: 1.19 s


Unnamed: 0,e_0,e_1,e_2,e_3,e_4,e_5,e_6,e_7,e_8,e_9,...,e_302,e_303,e_304,e_305,e_306,e_307,e_308,e_309,e_310,e_311
min,-0.132803,-0.149056,-0.094468,-0.191697,-0.114229,-0.114341,-0.096039,-0.115178,-0.157275,-0.116715,...,-0.103254,-0.122285,-0.149789,-0.127703,-0.094802,-0.11969,-0.141425,-0.123732,-0.081543,-0.227348
max,0.135038,0.076125,0.157854,0.030987,0.101192,0.111774,0.147497,0.173821,0.099808,0.115573,...,0.119518,0.197589,0.113135,0.13649,0.162921,0.099021,0.086653,0.158176,0.166968,0.048967


In [15]:
%%time
mns = emb2_2.min(axis=0)
mxs = emb2_2.max(axis=0)
mns, mxs = dask.compute(mns,mxs)
columns = [f'e_{i}' for i in range(312)]
indexes = ['min', 'max']
pd.DataFrame([mns,mxs], columns=columns, index=indexes)

Wall time: 618 ms


Unnamed: 0,e_0,e_1,e_2,e_3,e_4,e_5,e_6,e_7,e_8,e_9,...,e_302,e_303,e_304,e_305,e_306,e_307,e_308,e_309,e_310,e_311
min,-0.132803,-0.149056,-0.094468,-0.191697,-0.114229,-0.114341,-0.096039,-0.115178,-0.157275,-0.116715,...,-0.103254,-0.122285,-0.149789,-0.127703,-0.094802,-0.11969,-0.141425,-0.123732,-0.081543,-0.227348
max,0.135038,0.076125,0.157854,0.030987,0.101192,0.111774,0.147497,0.173821,0.099808,0.115573,...,0.119518,0.197589,0.113135,0.13649,0.162921,0.099021,0.086653,0.158176,0.166968,0.048967


**Во втором варианте решения время меньше в 1.6 раз**

<p class="task" id="4"></p>

4\. Датасет `embeddings` представляет собой набор 312-мерных векторов $x_i, i=0, 1, ... M-1$ Найдите вектор $x \ne x_{256}$ из набора данных, ближайший к вектору $x_{256}$ в смысле метрики $L_1$. Выведите на экран первые 10 координат вектора $x$.

$$d_1(\textbf{x},\textbf{y})=\sum_{k=1}^{n}{|x_i - y_i|}, \textbf{x}, \textbf{y} \in \mathbb{R}^n$$

In [16]:
%%time
x_256 = emb2_2[256,:]
x_without256 = (emb2_2[(emb2_2!=x_256).all(axis=1)]).compute_chunk_sizes()
index_x_closest = abs(x_without256-x_256).sum(axis=1).argmin()
x_closest = x_without256[index_x_closest]
x_closest[:10].compute()

Wall time: 2.14 s


array([ 0.0331987 , -0.03648246,  0.06629294, -0.0850755 , -0.04708353,
        0.00130241,  0.00259956,  0.01916818, -0.00985817, -0.04410348],
      dtype=float32)

In [17]:
%%time
x_256 = emb2_2[256,:]
index_x_closest = abs(emb2_2-x_256).sum(axis=1).argtopk(2)[0]
x_closest = x_without256[index_x_closest]
x_closest[:10].compute()

Wall time: 1.3 s


array([ 0.0387719 , -0.04352599,  0.04163509, -0.09189776,  0.03607241,
        0.0422313 ,  0.03730326,  0.00767737, -0.01964515, -0.00817577],
      dtype=float32)

<p class="task" id="5"></p>

5\. Рецепты разбиты на 4 группы. Загрузите маску для разбиения на группы из датасета `mask` из файла `recipe_embeddings.h5` в виде `dask.array`. Для каждой группы посчитайте и выведите на экран максимальное значение нормы $\ell_1$ векторов рецептов, принадлежащих к этой группе. 

Подсказка: закодируйте маску принадлежности к группе при помощи метода кодирования one-hot encoding и воспользуйтесь механизмом распространения.

$$\ell_1: ||\textbf{x}||_1=\sum_{k=1}^{n}{|x_k|}, \textbf{x} \in \mathbb{R}^n$$

In [18]:
from sklearn.preprocessing import OneHotEncoder

In [19]:
mask = da.from_array(f1['mask'])
mask

Unnamed: 0,Array,Chunk
Bytes,9.16 MiB,9.16 MiB
Shape,"(1200000,)","(1200000,)"
Count,2 Tasks,1 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 9.16 MiB 9.16 MiB Shape (1200000,) (1200000,) Count 2 Tasks 1 Chunks Type int64 numpy.ndarray",1200000  1,

Unnamed: 0,Array,Chunk
Bytes,9.16 MiB,9.16 MiB
Shape,"(1200000,)","(1200000,)"
Count,2 Tasks,1 Chunks
Type,int64,numpy.ndarray


In [20]:
enc = OneHotEncoder()
mask_encoded = da.from_array(enc.fit_transform(mask.reshape(-1,1)).toarray())
mask_encoded

Unnamed: 0,Array,Chunk
Bytes,36.62 MiB,36.62 MiB
Shape,"(1200000, 4)","(1200000, 4)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 36.62 MiB 36.62 MiB Shape (1200000, 4) (1200000, 4) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",4  1200000,

Unnamed: 0,Array,Chunk
Bytes,36.62 MiB,36.62 MiB
Shape,"(1200000, 4)","(1200000, 4)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [21]:
l1_list = abs(emb2_2).sum(axis=1).reshape(-1,1)
a,b = np.broadcast_arrays(mask_encoded, l1_list)
(a*b).max(axis=0).compute()

array([13.31967735, 13.32409477, 13.31525993, 13.31915665])

In [22]:
f1.close()

<p class="task" id="6"></p>

6\. Работая с исходным файлом в формате `hdf`, реализуйте алгоритм подсчета среднего вектора датасета в блочной форме.

Блочный алгоритм вычислений состоит из двух частей:
1. Загрузка фрагмента за фрагментом данных и проведение вычислений над этим фрагментом
2. Агрегация результатов вычислений на различных фрагментах для получения результата на уровне всего набора данных

Важно: при работе с `hdf` в память загружаются не все элементы, а только те, которые запрашиваются в данный момент. При работе с `hdf` вы можете работать с массивами `numpy.array`. Для итерации по сегментам файла допускается использование циклов.

In [23]:
f7 = h5py.File("recipe_embeddings.h5", "r")

In [31]:
%%time
block_size = 100_000
mean_vectors = []
for i in range(12):
    block = np.array(f7['embeddings'][i*block_size:(i+1)*block_size])
    mean_vectors.extend(block.mean(axis=0))

Wall time: 970 ms


In [32]:
len(mean_vectors), mean_vectors[:10]

(3744,
 [0.00096760504,
  -0.034023467,
  0.04670283,
  -0.087452225,
  -0.009655452,
  0.0054168557,
  0.027200062,
  0.03748824,
  -0.02362777,
  -0.011074276])

<p class="task" id="7"></p>

7\. Решите задачу 6, распараллелив вычисления при помощи `ThreadPool`. Сравните время и результаты решения работы вашего алгоритма с реализацией поиска среднего вектора из `dask`. 

In [33]:
def process(i):
    block_size = 100_000
    block = np.array(f7['embeddings'][i*block_size:(i+1)*block_size])
    return block.mean(axis=0)

In [34]:
from multiprocessing.pool import ThreadPool

In [35]:
%%time
with ThreadPool(processes=12) as pool:
    res = pool.map(process, range(12))

Wall time: 698 ms


In [36]:
res

[array([ 9.67605039e-04, -3.40234675e-02,  4.67028283e-02, -8.74522254e-02,
        -9.65545233e-03,  5.41685568e-03,  2.72000618e-02,  3.74882407e-02,
        -2.36277692e-02, -1.10742757e-02,  2.61751711e-02,  8.81028827e-03,
         1.82223264e-02,  5.93920834e-02,  2.72670165e-02, -6.50585070e-03,
         3.03692408e-02,  1.04516782e-02,  1.93924066e-02,  1.43489584e-01,
        -2.50518369e-03, -5.92547795e-03, -3.19994427e-02, -4.11539562e-02,
         8.63013938e-02,  2.80228127e-02, -2.43750140e-02, -4.51874128e-03,
         1.09994765e-02,  3.78023088e-02, -1.07476339e-02, -1.56985633e-02,
         1.13027880e-03, -2.19424982e-02,  7.05337711e-03,  5.16469479e-02,
        -9.19271086e-04, -3.82208414e-02, -7.48995990e-02,  1.79542229e-02,
        -6.01696111e-02,  1.52892023e-01,  7.49480352e-02, -3.78704853e-02,
        -2.16874983e-02,  3.36373993e-03,  3.78066152e-02, -4.40908559e-02,
         4.41280268e-02, -3.56657431e-02,  8.00017640e-03, -3.51438038e-02,
        -4.2

In [38]:
%%time
emb7 = da.from_array(f7['embeddings'])
mean_vectors_dask = emb7.mean(axis=0)
len(mean_vectors_dask), mean_vectors_dask.compute()

Wall time: 548 ms


(312,
 array([ 9.75613308e-04, -3.39259915e-02,  4.66551557e-02, -8.74476507e-02,
        -9.71177034e-03,  5.51532954e-03,  2.71454044e-02,  3.75370905e-02,
        -2.35138610e-02, -1.10451411e-02,  2.61558332e-02,  8.86423327e-03,
         1.82165187e-02,  5.94256371e-02,  2.72058621e-02, -6.64571533e-03,
         3.04658655e-02,  1.03974920e-02,  1.93780344e-02,  1.43449485e-01,
        -2.46166624e-03, -5.94153255e-03, -3.18063907e-02, -4.10590284e-02,
         8.61882269e-02,  2.80386396e-02, -2.43214611e-02, -4.59721498e-03,
         1.09600713e-02,  3.78382765e-02, -1.06729204e-02, -1.57453716e-02,
         1.08159299e-03, -2.18519215e-02,  7.02830730e-03,  5.17747067e-02,
        -9.01941909e-04, -3.80985737e-02, -7.50487596e-02,  1.78839285e-02,
        -6.01360276e-02,  1.52848035e-01,  7.50525817e-02, -3.78861800e-02,
        -2.17200909e-02,  3.46800871e-03,  3.78258862e-02, -4.40570116e-02,
         4.40716483e-02, -3.56613398e-02,  8.02602526e-03, -3.51645462e-02,
      