# Параллельные вычисления

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Макрушин С.В. Лекция "Параллельные вычисления"
* https://docs.python.org/3/library/multiprocessing.html
    * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process
    * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
    * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue
* https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
* https://numpy.org/doc/stable/reference/generated/numpy.array_split.html
* https://nalepae.github.io/pandarallel/
    * https://github.com/nalepae/pandarallel/blob/master/docs/examples_windows.ipynb
    * https://github.com/nalepae/pandarallel/blob/master/docs/examples_mac_linux.ipynb

## Задачи для совместного разбора

In [None]:
# !pip install pandarallel

1. Посчитайте, сколько раз встречается буква "a" в файлах ["xaa", "xab", "xac", "xad"]. 

In [2]:
import multiprocessing

files = [f"10_multiprocessing_data/{name}.txt" for name in ["xaa", "xab", "xac", "xad"]]

2. Выведите на экран слова из файла words_alpha, в которых есть две или более буквы "e" подряд.

In [28]:
import pandas as pd

words = (
    pd.read_csv("10_multiprocessing_data/words_alpha.txt", header=None)[0]
    .dropna()
    .sample(frac=1, replace=True)
)

## Лабораторная работа 10

__При решении данных задач не подразумевается использования циклов или генераторов Python в ходе работы с пакетами `numpy` и `pandas`, если в задании не сказано обратного. Решения задач, в которых для обработки массивов `numpy` или структур `pandas` используются явные циклы (без согласования с преподавателем), могут быть признаны некорректными и не засчитаны.__

1\. В каждой строке файла `tag_nsteps.csv` хранится информация о тэге рецепта и количестве шагов в этом рецепте в следующем виде:

```
tags,n_steps
hungarian,2
european,6
occasion,4
pumpkin,4
................
```

Всего в исходном файле хранится чуть меньше, чем 71 млн, строк. Разбейте файл `tag_nsteps.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `tag_nsteps_*.csv`, где вместо символа `*` указан номер очередного файла. Каждый файл имеет структуру, аналогичную оригинальному файлу (включая заголовок).

__Важно__: здесь и далее вы не можете загружать в память весь исходный файл сразу. 

In [1]:
import pandas as pd

In [27]:
k = 1
for i in pd.read_csv('tag_nsteps.csv', chunksize= 8_875_000, iterator= True):
    i.to_csv(f'tag_nsteps_{k}.csv',
            index = False)
    k+= 1 

In [4]:
71_000_000 / 8

8875000.0

2\. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает для каждого тэга сумму по столбцу `n_steps` и количество строк c этим тэгом, и возвращает результат в виде словаря. Ожидаемый вид итогового словаря:

```
{
    '1-day-or-more': {'sum': 56616, 'count': 12752},
    '15-minutes-or-less': {'sum': 195413, 'count': 38898},
    '3-steps-or-less': {'sum': 187938, 'count': 39711},
    ....
}
```

Примените данную функцию к каждому файлу, полученному в задании 1, и соберите результат в виде списка словарей. Не используйте параллельных вычислений. 

Выведите на экран значение по ключу "30-minutes-or-less" для каждого из словарей.

In [125]:
def get_tag_sum_count_from_file(file: str): # -> dict:
    return pd.read_csv(file).groupby('tags').agg(sum_ = ('n_steps','sum'),count=('tags','count')).to_dict('index')
    
    

In [127]:
file_names = [f'tag_nsteps_{i}.csv' for i in range(1,9)]
a = list(map(get_tag_sum_count_from_file, file_names))
a


[{'1-day-or-more': {'sum_': 56616, 'count': 12752},
  '15-minutes-or-less': {'sum_': 195413, 'count': 38898},
  '3-steps-or-less': {'sum_': 187938, 'count': 39711},
  '30-minutes-or-less': {'sum_': 348943, 'count': 45605},
  '4-hours-or-less': {'sum_': 429827, 'count': 42683},
  '5-ingredients-or-less': {'sum_': 179724, 'count': 33842},
  '60-minutes-or-less': {'sum_': 522209, 'count': 55224},
  'Throw the ultimate fiesta with this sopaipillas recipe from Food.com.': {'sum_': 39985,
   'count': 11451},
  'a1-sauce': {'sum_': 40025, 'count': 11357},
  'african': {'sum_': 57174, 'count': 13138},
  'american': {'sum_': 232336, 'count': 30755},
  'amish-mennonite': {'sum_': 40700, 'count': 11419},
  'angolan': {'sum_': 39530, 'count': 11285},
  'appetizers': {'sum_': 150075, 'count': 24009},
  'apples': {'sum_': 69464, 'count': 14320},
  'april-fools-day': {'sum_': 39256, 'count': 11228},
  'argentine': {'sum_': 40794, 'count': 11393},
  'artichoke': {'sum_': 40371, 'count': 11546},
  'asi

3\. Напишите функцию, которая объединяет результаты обработки отдельных файлов. Данная функция принимает на вход список словарей, каждый из которых является результатом вызова функции `get_tag_sum_count_from_file` для конкретного файла, и агрегирует эти словари. Не используйте параллельных вычислений.

Процедура агрегации словарей имеет следующий вид:
$$d_{agg}[k] = \{sum: \sum_{i=1}^{n}d_{i}[k][sum], count: \sum_{i=1}^{n}d_{i}[k][count]\}$$
где $d_1, d_2, ..., d_n$- результат вызова функции `get_tag_sum_count_from_file` для конкретных файлов.

Примените данную функцию к результату выполнения задания 2. Выведите на экран результат для тэга "30-minutes-or-less".

In [311]:
def agg_results(tag_sum_count_list: list) -> dict:
    b = {}
    for i in tag_sum_count_list:
        for k,v in i.items():
            if k in b:
                b[k] ={l: b[k].get(l) + i[k].get(l) for l in i[k]}  
            else:
                b[k] = v
    return b

In [312]:
agg_results(a)['30-minutes-or-less']

{'sum_': 2783205, 'count': 365782}

4\. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга в словаре, имеющего вид, аналогичный словарям в задаче 2, и возвращает результат в виде словаря . Используйте решения задач 1-3, чтобы получить среднее значение количества шагов каждого тэга для всего датасета, имея результаты обработки частей датасета и результат их агрегации. Выведите на экран результат для тэга "30-minutes-or-less".

Определите, за какое время задача решается для всего датасета. При замере времени учитывайте время расчета статистики для каждого файла, агрегации результатов и, собственно, вычисления средного. Временем, затрачиваемым на процедуру разбиения исходного файла можно пренебречь.

In [2]:
def get_tag_sum_count_from_file(file: str): # -> dict:
    return pd.read_csv(file).groupby('tags').mean() #.agg(sum_ = ('n_steps','sum'),count=('tags','count')).to_dict('index')
    
    

In [17]:
file_names = [f'tag_nsteps_{i}.csv' for i in range(1,9)]
a = list(map(get_tag_sum_count_from_file, file_names))
a

[                       n_steps
 tags                          
 1-day-or-more         4.439774
 15-minutes-or-less    5.023729
 3-steps-or-less       4.732643
 30-minutes-or-less    7.651420
 4-hours-or-less      10.070215
 ...                        ...
 wings                 3.721457
 winter                5.439827
 yams-sweet-potatoes   4.003736
 yeast                 5.139542
 zucchini              3.591716
 
 [551 rows x 1 columns],
                        n_steps
 tags                          
 1-day-or-more         4.489310
 15-minutes-or-less    4.944426
 3-steps-or-less       4.729855
 30-minutes-or-less    7.604152
 4-hours-or-less      10.157638
 ...                        ...
 wings                 3.695301
 winter                5.490873
 yams-sweet-potatoes   4.038069
 yeast                 5.155676
 zucchini              3.591789
 
 [551 rows x 1 columns],
                        n_steps
 tags                          
 1-day-or-more         4.429929
 15-minutes-or-les

In [None]:
pd.read_csv('tag_nsteps_1.csv')[pd.read_csv('tag_nsteps_1.csv')['tags'] == '1-day-or-more'].mean()

In [16]:
sum(a)

Unnamed: 0_level_0,n_steps
tags,Unnamed: 1_level_1
1-day-or-more,4.439774
15-minutes-or-less,5.023729
3-steps-or-less,4.732643
30-minutes-or-less,7.651420
4-hours-or-less,10.070215
...,...
wings,3.721457
winter,5.439827
yams-sweet-potatoes,4.003736
yeast,5.139542


In [None]:
def get_tag_mean_n_steps(tag_sum_count: dict) -> dict:
    pass

5\. Повторите решение задачи 4, распараллелив вызовы функции `get_tag_sum_count_from_file` для различных файлов с помощью `multiprocessing.Pool`. Для обработки каждого файла создайте свой собственный процесс. Выведите на экран результат для тэга "30-minutes-or-less". Определите, за какое время задача решается для всех файлов. При замере времени учитывайте время расчета статистики для каждого файла, агрегации результатов и, собственно, вычисления средного. Временем, затрачиваемым на процедуру разбиения исходного файла можно пренебречь.

6\. Повторите решение задачи 4, распараллелив вычисления функции `get_tag_sum_count_from_file` для различных файлов с помощью `multiprocessing.Process`. Для обработки каждого файла создайте свой собственный процесс. Для обмена данными между процессами используйте `multiprocessing.Queue`.

Выведите на экран результат для тэга "30-minutes-or-less". Определите, за какое время задача решается для всех файлов. При замере времени учитывайте время расчета статистики для каждого файла, агрегации результатов и, собственно, вычисления средного. Временем, затрачиваемым на процедуру разбиения исходного файла можно пренебречь.

7\. Исследуйте, как влияет количество запущенных одновременно процессов на скорость решения задачи. Узнайте количество ядер вашего процессора $K$. Повторите решение задачи 1, разбив исходный файл на $\frac{K}{2}$, $K$ и $2K$ фрагментов. Для каждого из разбиений повторите решение задачи 5. Визуализируйте зависимость времени выполнения кода от количества файлов в разбиении. Сделайте вывод в виде текстового комментария.

8\. Напишите функцию `parallel_map`, которая принимает на вход серию `s` `pd.Series` и функцию одного аргумента `f` и поэлементно применяет эту функцию к серии, распараллелив вычисления при помощи пакета `multiprocessing`. Логика работы функции `parallel_map` должна включать следующие действия:
* разбиение исходной серии на $K$ частей, где $K$ - количество ядер вашего процессора;
* параллельное применение функции `f` к каждой части при помощи метода _серии_ `map` при помощи нескольких подпроцессов;
* объединение результатов работы подпроцессов в одну серию. 

In [None]:
def parallel_map(s: pd.Series, f: callable) -> pd.Series:
    pass

9\. Напишите функцию `f`, которая принимает на вход тэг и проверяет, удовлетворяет ли тэг следующему шаблону: `[любое число]-[любое слово]-or-less`. Возьмите любой фрагмент файла, полученный в задании 1, примените функцию `f` при помощи `parallel_map` к столбцу `tags` и посчитайте количество тэгов, подходящих под этот шаблон. Решите ту же задачу, воспользовавшись методом _серий_ `map`. Сравните время и результат выполнения двух решений.

In [None]:
def f(tag: str) -> bool:
    pass

10\. Используя пакет `pandarallel`, примените функцию `f` из задания 9 к столбцу `tags` таблицы, с которой вы работали этом задании. Посчитайте количество тэгов, подходящих под описанный шаблон. Измерьте время выполнения кода. Выведите на экран полученный результат.