# Параллельные вычисления

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Макрушин С.В. Лекция "Параллельные вычисления"
* https://docs.python.org/3/library/multiprocessing.html
    * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process
    * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
    * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue
* https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
* https://numpy.org/doc/stable/reference/generated/numpy.array_split.html
* https://nalepae.github.io/pandarallel/
    * https://github.com/nalepae/pandarallel/blob/master/docs/examples_windows.ipynb
    * https://github.com/nalepae/pandarallel/blob/master/docs/examples_mac_linux.ipynb

## Задачи для совместного разбора

In [24]:
# !pip install pandarallel

1. Посчитайте, сколько раз встречается буква "a" в файлах ["xaa", "xab", "xac", "xad"]. 

In [2]:
import multiprocessing

files = [f"{name}.txt" for name in ["xaa", "xab", "xac", "xad"]]

2. Выведите на экран слова из файла words_alpha, в которых есть две или более буквы "e" подряд.

In [28]:
import pandas as pd

words = (
    pd.read_csv("10_multiprocessing_data/words_alpha.txt", header=None)[0]
    .dropna()
    .sample(frac=1, replace=True)
)

## Лабораторная работа 10

__При решении данных задач не подразумевается использования циклов или генераторов Python в ходе работы с пакетами `numpy` и `pandas`, если в задании не сказано обратного. Решения задач, в которых для обработки массивов `numpy` или структур `pandas` используются явные циклы (без согласования с преподавателем), могут быть признаны некорректными и не засчитаны.__

In [1]:
import pandas as pd
import numpy as np
import multiprocessing as mp

1\. В каждой строке файла `tag_nsteps.csv` хранится информация о тэге рецепта и количестве шагов в этом рецепте в следующем виде:

```
tags,n_steps
hungarian,2
european,6
occasion,4
pumpkin,4
................
```

Всего в исходном файле хранится чуть меньше, чем 71 млн, строк. Разбейте файл `tag_nsteps.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `tag_nsteps_*.csv`, где вместо символа `*` указан номер очередного файла. Каждый файл имеет структуру, аналогичную оригинальному файлу (включая заголовок).

__Важно__: здесь и далее вы не можете загружать в память весь исходный файл сразу. 

In [2]:
n_files = 12

In [6]:
chunks = pd.read_csv(f'tag_nsteps.csv', chunksize=71_000_000//n_files)
for i, chunk in enumerate(chunks):
    chunk.to_csv(f'tag_nsteps_{i}.csv', index=False)

2\. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает для каждого тэга сумму по столбцу `n_steps` и количество строк c этим тэгом, и возвращает результат в виде словаря. Ожидаемый вид итогового словаря:

```
{
    '1-day-or-more': {'sum': 56616, 'count': 12752},
    '15-minutes-or-less': {'sum': 195413, 'count': 38898},
    '3-steps-or-less': {'sum': 187938, 'count': 39711},
    ....
}
```

Примените данную функцию к каждому файлу, полученному в задании 1, и соберите результат в виде списка словарей. Не используйте параллельных вычислений. 

Выведите на экран значение по ключу "30-minutes-or-less" для каждого из словарей.

In [7]:
def get_tag_sum_count_from_file(file: str) -> dict:
    df = pd.read_csv(file)
    df2 = df.groupby("tags")["n_steps"]
    df3 = df2.agg(sum = 'sum', count = 'count')
    return df3.to_dict('index')

In [10]:
res2 = [get_tag_sum_count_from_file(f"tag_nsteps_{i}.csv") for i in range(n_files)]

In [11]:
for dict_ in res2:
    print(dict_["30-minutes-or-less"])

{'sum': 234016, 'count': 30597}
{'sum': 230889, 'count': 30304}
{'sum': 234232, 'count': 30757}
{'sum': 237432, 'count': 31043}
{'sum': 230152, 'count': 30297}
{'sum': 233737, 'count': 30576}
{'sum': 229742, 'count': 30242}
{'sum': 234179, 'count': 30765}
{'sum': 232940, 'count': 30657}
{'sum': 231994, 'count': 30647}
{'sum': 232735, 'count': 30735}
{'sum': 221157, 'count': 29162}


3\. Напишите функцию, которая объединяет результаты обработки отдельных файлов. Данная функция принимает на вход список словарей, каждый из которых является результатом вызова функции `get_tag_sum_count_from_file` для конкретного файла, и агрегирует эти словари. Не используйте параллельных вычислений.

Процедура агрегации словарей имеет следующий вид:
$$d_{agg}[k] = \{sum: \sum_{i=1}^{n}d_{i}[k][sum], count: \sum_{i=1}^{n}d_{i}[k][count]\}$$
где $d_1, d_2, ..., d_n$- результат вызова функции `get_tag_sum_count_from_file` для конкретных файлов.

Примените данную функцию к результату выполнения задания 2. Выведите на экран результат для тэга "30-minutes-or-less".

In [8]:
def agg_results(tag_sum_count_list: list) -> dict:
    main_df = pd.DataFrame(columns=["index", "sum", "count"])
    for dict_ in tag_sum_count_list:
        temp = pd.DataFrame.from_dict(dict_, orient="index").reset_index()
        main_df = pd.concat([temp, main_df])
        res = main_df.groupby("index").sum().to_dict("index")
    return res

In [13]:
res3 = agg_results(res2)

In [14]:
res3["30-minutes-or-less"]

{'sum': 2783205, 'count': 365782}

4\. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга в словаре, имеющего вид, аналогичный словарям в задаче 2, и возвращает результат в виде словаря . Используйте решения задач 1-3, чтобы получить среднее значение количества шагов каждого тэга для всего датасета, имея результаты обработки частей датасета и результат их агрегации. Выведите на экран результат для тэга "30-minutes-or-less".

Определите, за какое время задача решается для всего датасета. При замере времени учитывайте время расчета статистики для каждого файла, агрегации результатов и, собственно, вычисления средного. Временем, затрачиваемым на процедуру разбиения исходного файла можно пренебречь.

In [9]:
def get_tag_mean_n_steps(tag_sum_count: dict) -> dict:
    for tag,sum_count in tag_sum_count.items():
        tag_sum_count[tag] = sum_count["sum"] / sum_count["count"]
    return tag_sum_count

In [18]:
%%timeit
res4 = get_tag_mean_n_steps(agg_results([get_tag_sum_count_from_file(f"tag_nsteps_{i}.csv") for i in range(n_files)]))
res4["30-minutes-or-less"]

14.6 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [28]:
res4 = get_tag_mean_n_steps(agg_results([get_tag_sum_count_from_file(f"tag_nsteps_{i}.csv") for i in range(n_files)]))
res4["30-minutes-or-less"]

7.608917333275011

5\. Повторите решение задачи 4, распараллелив вызовы функции `get_tag_sum_count_from_file` для различных файлов с помощью `multiprocessing.Pool`. Для обработки каждого файла создайте свой собственный процесс. Выведите на экран результат для тэга "30-minutes-or-less". Определите, за какое время задача решается для всех файлов. При замере времени учитывайте время расчета статистики для каждого файла, агрегации результатов и, собственно, вычисления средного. Временем, затрачиваемым на процедуру разбиения исходного файла можно пренебречь.

In [15]:
%%file helpers.py
import pandas as pd
def get_tag_sum_count_from_file(file: str) -> dict:
    df = pd.read_csv(file)
    df2 = df.groupby("tags")["n_steps"]
    df3 = df2.agg(sum = 'sum', count = 'count')
    return df3.to_dict('index')

Overwriting helpers.py


In [16]:
import helpers

In [26]:
%%timeit
pool = mp.Pool(processes=n_files)
results = list(pool.map(helpers.get_tag_sum_count_from_file, [f'tag_nsteps_{i}.csv' for i in range(n_files)]))
res5 = get_tag_mean_n_steps(agg_results(results))

3.28 s ± 32.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [27]:
pool = mp.Pool(processes=n_files)
results = list(pool.map(helpers.get_tag_sum_count_from_file, [f'tag_nsteps_{i}.csv' for i in range(n_files)]))
res5 = get_tag_mean_n_steps(agg_results(results))
res5["30-minutes-or-less"]

7.608917333275011

6\. Повторите решение задачи 4, распараллелив вычисления функции `get_tag_sum_count_from_file` для различных файлов с помощью `multiprocessing.Process`. Для обработки каждого файла создайте свой собственный процесс. Для обмена данными между процессами используйте `multiprocessing.Queue`.

Выведите на экран результат для тэга "30-minutes-or-less". Определите, за какое время задача решается для всех файлов. При замере времени учитывайте время расчета статистики для каждого файла, агрегации результатов и, собственно, вычисления средного. Временем, затрачиваемым на процедуру разбиения исходного файла можно пренебречь.

In [3]:
%%file helpers2.py
import pandas as pd

def get_tag_sum_count_from_file(file: str, output) -> dict:
    df = pd.read_csv(file)
    df2 = df.groupby("tags")["n_steps"]
    df3 = df2.agg(sum = 'sum', count = 'count')
    output.put(df3.to_dict('index'))

Overwriting helpers2.py


In [4]:
import helpers2

In [12]:
%%timeit
output = mp.Queue()
[mp.Process(target=helpers2.get_tag_sum_count_from_file, args=(f"tag_nsteps_{i}.csv", output)).start() \
        for i in range(n_files)]
    
results = [output.get() for _ in range(n_files)]
res6 = get_tag_mean_n_steps(agg_results(results))

3.3 s ± 28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
output = mp.Queue()
[mp.Process(target=helpers2.get_tag_sum_count_from_file, args=(f"tag_nsteps_{i}.csv", output)).start() \
        for i in range(n_files)]
    
results = [output.get() for _ in range(n_files)]
res6 = get_tag_mean_n_steps(agg_results(results))
res6["30-minutes-or-less"]

7.608917333275011

7\. Исследуйте, как влияет количество запущенных одновременно процессов на скорость решения задачи. Узнайте количество ядер вашего процессора $K$. Повторите решение задачи 1, разбив исходный файл на $\frac{K}{2}$, $K$ и $2K$ фрагментов. Для каждого из разбиений повторите решение задачи 5. Визуализируйте зависимость времени выполнения кода от количества файлов в разбиении. Сделайте вывод в виде текстового комментария.

In [21]:
n_files = mp.cpu_count()//2
chunks = pd.read_csv(f'tag_nsteps.csv', chunksize=71_000_000//n_files)
for i, chunk in enumerate(chunks):
    chunk.to_csv(f'tag_nsteps_{i}.csv', index=False)

In [22]:
%%timeit
pool = mp.Pool(processes=n_files)
results = list(pool.map(helpers.get_tag_sum_count_from_file, [f'tag_nsteps_{i}.csv' for i in range(n_files)]))
res5 = get_tag_mean_n_steps(agg_results(results))

3.5 s ± 111 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [23]:
n_files = mp.cpu_count()
chunks = pd.read_csv(f'tag_nsteps.csv', chunksize=71_000_000//n_files)
for i, chunk in enumerate(chunks):
    chunk.to_csv(f'tag_nsteps_{i}.csv', index=False)

In [24]:
%%timeit
pool = mp.Pool(processes=n_files)
results = list(pool.map(helpers.get_tag_sum_count_from_file, [f'tag_nsteps_{i}.csv' for i in range(n_files)]))
res5 = get_tag_mean_n_steps(agg_results(results))

3.27 s ± 31.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [25]:
n_files = mp.cpu_count()*2
chunks = pd.read_csv(f'tag_nsteps.csv', chunksize=71_000_000//n_files)
for i, chunk in enumerate(chunks):
    chunk.to_csv(f'tag_nsteps_{i}.csv', index=False)

In [26]:
%%timeit
pool = mp.Pool(processes=n_files)
results = list(pool.map(helpers.get_tag_sum_count_from_file, [f'tag_nsteps_{i}.csv' for i in range(n_files)]))
res5 = get_tag_mean_n_steps(agg_results(results))

4.04 s ± 13.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


K выгодный самый

8\. Напишите функцию `parallel_map`, которая принимает на вход серию `s` `pd.Series` и функцию одного аргумента `f` и поэлементно применяет эту функцию к серии, распараллелив вычисления при помощи пакета `multiprocessing`. Логика работы функции `parallel_map` должна включать следующие действия:
* разбиение исходной серии на $K$ частей, где $K$ - количество ядер вашего процессора;
* параллельное применение функции `f` к каждой части при помощи метода _серии_ `map` при помощи нескольких подпроцессов;
* объединение результатов работы подпроцессов в одну серию. 

In [27]:
%%file helpers3.py
import pandas as pd

def mapp(s: pd.Series, f: callable) -> pd.Series:
    return s.map(f)

Overwriting helpers3.py


In [28]:
import helpers3

In [29]:
import pandas as pd
import numpy as np
import multiprocessing as mp

In [30]:
s = pd.Series(range(6*100))
def parallel_map(s: pd.Series, f: callable) -> pd.Series: 
    lst = np.array_split(s, 6)
    
    pool = mp.Pool(processes=6)
    return pd.concat(pool.starmap(helpers3.mapp, [(part, f) for part in lst] ))

9\. Напишите функцию `f`, которая принимает на вход тэг и проверяет, удовлетворяет ли тэг следующему шаблону: `[любое число]-[любое слово]-or-less`. Возьмите любой фрагмент файла, полученный в задании 1, примените функцию `f` при помощи `parallel_map` к столбцу `tags` и посчитайте количество тэгов, подходящих под этот шаблон. Решите ту же задачу, воспользовавшись методом _серий_ `map`. Сравните время и результат выполнения двух решений.

In [31]:
%%file helpers4.py
import re
def f(tag: str) -> bool:
    return bool(re.match("\d+-\w+-or-less", str(tag)))

Overwriting helpers4.py


In [32]:
import helpers4

In [33]:
df = pd.read_csv("tag_nsteps_0.csv")

In [39]:
%%timeit
parallel_map(df.tags, helpers4.f).sum()

1.01 s ± 15.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [40]:
%%timeit
len(df.tags[df.tags.map(helpers4.f)])

1.9 s ± 9.09 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


10\. Используя пакет `pandarallel`, примените функцию `f` из задания 9 к столбцу `tags` таблицы, с которой вы работали этом задании. Посчитайте количество тэгов, подходящих под описанный шаблон. Измерьте время выполнения кода. Выведите на экран полученный результат.

In [36]:
from pandarallel import pandarallel

In [37]:
pandarallel.initialize()

INFO: Pandarallel will run on 6 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.

https://nalepae.github.io/pandarallel/troubleshooting/


In [41]:
%%timeit
df.tags.parallel_map(helpers4.f).sum()

1.38 s ± 23.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
# все таки разделение на потоки оказалось самым быстрым