# Параллельные вычисления

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Макрушин С.В. Лекция "Параллельные вычисления"
* https://docs.python.org/3/library/multiprocessing.html
    * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process
    * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
    * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue
* https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
* https://numpy.org/doc/stable/reference/generated/numpy.array_split.html
* https://nalepae.github.io/pandarallel/
    * https://github.com/nalepae/pandarallel/blob/master/docs/examples_windows.ipynb
    * https://github.com/nalepae/pandarallel/blob/master/docs/examples_mac_linux.ipynb

## Задачи для совместного разбора

In [22]:
!pip install pandarallel

Collecting pandarallel
  Using cached pandarallel-1.6.3-py3-none-any.whl
Collecting dill>=0.3.1
  Using cached dill-0.3.6-py3-none-any.whl (110 kB)
Installing collected packages: dill, pandarallel
Successfully installed dill-0.3.6 pandarallel-1.6.3


1. Посчитайте, сколько раз встречается буква "a" в файлах ["xaa", "xab", "xac", "xad"]. 

In [2]:
import multiprocessing

files = [f"10_multiprocessing_data/{name}.txt" for name in ["xaa", "xab", "xac", "xad"]]

In [3]:
from collections import Counter

def count_a(file):
    with open(file) as fp:
        text = fp.read().lower()
    res = Counter(text)["a"]
    print(file, res)
    return res

In [4]:
%%time
[count_a(f) for f in files]


10_multiprocessing_data/xaa.txt 2599627
10_multiprocessing_data/xab.txt 2605911
10_multiprocessing_data/xac.txt 2599868
10_multiprocessing_data/xad.txt 1460452
CPU times: total: 15.5 s
Wall time: 15.6 s


[2599627, 2605911, 2599868, 1460452]

In [5]:
%%file count_a.py
from collections import Counter

def count_a(file):
    with open(file) as fp:
        text = fp.read().lower()
    res = Counter(text)["a"]
    print(file, res)
    return res


Overwriting count_a.py


In [6]:
from count_a import count_a

In [7]:
%%time
with multiprocessing.Pool(processes=4) as pool:
    res = pool.map(count_a, files)
res

CPU times: total: 15.6 ms
Wall time: 5.21 s


[2599627, 2605911, 2599868, 1460452]

In [8]:
%%file count_a_q.py
from collections import Counter

def count_a_q(file, queue):
    with open(file) as fp:
        text = fp.read().lower()
    res = Counter(text)["a"]
    print(file, res)
    queue.put(res)

Overwriting count_a_q.py


In [9]:
from count_a_q import count_a_q

In [10]:
%%time
ps = []
queue = multiprocessing.Queue()

for f in files:
    p = multiprocessing.Process(target=count_a_q, args=(f, queue))
    ps.append(p)
    p.start()

rs = []
while len(rs) < 4:
    if not queue.empty():
        rs.append(queue.get())
    
for p in ps:
    p.join()

rs

CPU times: total: 5.05 s
Wall time: 5.07 s


[1460452, 2599868, 2605911, 2599627]

2. Выведите на экран слова из файла words_alpha, в которых есть две или более буквы "e" подряд.

In [151]:
import pandas as pd

words = (
    pd.read_csv("10_multiprocessing_data/words_alpha.txt", header=None)[0]
    .dropna()
    .sample(frac=1, replace=True)
)


In [12]:
words.size

370103

In [13]:
import re

def f(s): 
    return bool(re.findall(r"e{2,}", s))


In [14]:
%%file f.py
import re

def f(s): 
    return bool(re.findall(r"e{2,}", s))


Overwriting f.py


In [23]:
import pandarallel 

In [24]:
from f import f

from pandarallel import pandarallel 
pandarallel.initialize()


INFO: Pandarallel will run on 4 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.

https://nalepae.github.io/pandarallel/troubleshooting/


In [25]:
%%time
words[words.map(f)].size

CPU times: total: 672 ms
Wall time: 672 ms


7174

In [26]:
%%time
words[words.parallel_map(f)].size

  yield data[chunk_]


CPU times: total: 281 ms
Wall time: 2.92 s


7174

In [27]:
pd.__version__

'1.5.1'

## Лабораторная работа 10

__При решении данных задач не подразумевается использования циклов или генераторов Python в ходе работы с пакетами `numpy` и `pandas`, если в задании не сказано обратного. Решения задач, в которых для обработки массивов `numpy` или структур `pandas` используются явные циклы (без согласования с преподавателем), могут быть признаны некорректными и не засчитаны.__

1\. В каждой строке файла `tag_nsteps.csv` хранится информация о тэге рецепта и количестве шагов в этом рецепте в следующем виде:

```
tags,n_steps
hungarian,2
european,6
occasion,4
pumpkin,4
................
```

Всего в исходном файле хранится чуть меньше, чем 71 млн, строк. Разбейте файл `tag_nsteps.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `tag_nsteps_*.csv`, где вместо символа `*` указан номер очередного файла. Каждый файл имеет структуру, аналогичную оригинальному файлу (включая заголовок).

__Важно__: здесь и далее вы не можете загружать в память весь исходный файл сразу. 

In [21]:
pwd

'C:\\Users\\Артём\\OneDrive - ФГОБУ ВО Финансовый университет при Правительстве РФ\\Учёба\\3 курс\\Технологии обработки BD\\ТОБД22-ПМ20-Материалы к семинарам\\10_multiprocessing'

In [15]:
import pandas as pd
import numpy as np

### Всего строк в файле:

In [16]:
with open('C:\\Users\\Артём\\OneDrive - ФГОБУ ВО Финансовый университет при Правительстве РФ\\Учёба\\3 курс\\Технологии обработки BD\\ТОБД22-ПМ20-Материалы к семинарам\\10_multiprocessing\\10_multiprocessing_data\\tag_nsteps.csv'
) as fp:
    print(sum(1 for _ in fp))

70695586


### Я решил разбить файл на 10 файлов. Девять файлов будут иметь `7069558` строк, а десятый `7069564` строк

In [10]:
7069558 * 9 + 7069564

70695586

### 1 файл:

In [73]:
file_1 = pd.read_csv('C:\\Users\\Артём\\OneDrive - ФГОБУ ВО Финансовый университет при Правительстве РФ\\Учёба\\3 курс\\Технологии обработки BD\\ТОБД22-ПМ20-Материалы к семинарам\\10_multiprocessing\\10_multiprocessing_data\\tag_nsteps.csv', 
                     nrows=7069558,
                     dtype={'tags': 'str', 'n_steps': np.int64})

file_1.to_csv('tag_nsteps_1.csv', sep=',', encoding='utf-8', index=False)

### Со 2 по 9 файлы:

In [75]:
for i in range(2, 10):
    file = pd.read_csv('C:\\Users\\Артём\\OneDrive - ФГОБУ ВО Финансовый университет при Правительстве РФ\\Учёба\\3 курс\\Технологии обработки BD\\ТОБД22-ПМ20-Материалы к семинарам\\10_multiprocessing\\10_multiprocessing_data\\tag_nsteps.csv', 
                        nrows=7069558, 
                        skiprows=np.arange(0, 7069558*(i-1) + 1),
                        dtype={'tags': 'str', 'n_steps': np.int64},
                        names=['tags', 'n_steps'])
    file.to_csv(f'tag_nsteps_{i}.csv', sep=',', encoding='utf-8', index=False)

### 10 файл:

In [76]:
file_10 = pd.read_csv('C:\\Users\\Артём\\OneDrive - ФГОБУ ВО Финансовый университет при Правительстве РФ\\Учёба\\3 курс\\Технологии обработки BD\\ТОБД22-ПМ20-Материалы к семинарам\\10_multiprocessing\\10_multiprocessing_data\\tag_nsteps.csv', 
                     nrows=7069564,
                     skiprows=np.arange(0, 7069558*9 + 1),
                     dtype={'tags': 'str', 'n_steps': np.int64},
                     names=['tags', 'n_steps'])

file_10.to_csv(f'tag_nsteps_10.csv', sep=',', encoding='utf-8', index=False)

2\. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает для каждого тэга сумму по столбцу `n_steps` и количество строк c этим тэгом, и возвращает результат в виде словаря. Ожидаемый вид итогового словаря:

```
{
    '1-day-or-more': {'sum': 56616, 'count': 12752},
    '15-minutes-or-less': {'sum': 195413, 'count': 38898},
    '3-steps-or-less': {'sum': 187938, 'count': 39711},
    ....
}
```

Примените данную функцию к каждому файлу, полученному в задании 1, и соберите результат в виде списка словарей. Не используйте параллельных вычислений. 

Выведите на экран значение по ключу "30-minutes-or-less" для каждого из словарей.

In [5]:
def get_tag_sum_count_from_file(file: str) -> dict:
    df = pd.read_csv(file, dtype={'tags': 'str', 'n_steps': np.int64})
    df = df.groupby('tags') \
        .agg({'n_steps':'sum', 'tags':'count'}) \
        .rename(columns={'n_steps':'sum','tags':'count'})
    return df.T.to_dict('dict')

### Примените данную функцию к каждому файлу, полученному в задании 1, и соберите результат в виде списка словарей.

In [8]:
list_of_dicts = [get_tag_sum_count_from_file(f'tag_nsteps_{i}.csv') for i in range(1,11)]

### Выведите на экран значение по ключу "30-minutes-or-less" для каждого из словарей.

In [16]:
[d['30-minutes-or-less'] for d in list_of_dicts]

[{'sum': 278749, 'count': 36400},
 {'sum': 275881, 'count': 36302},
 {'sum': 283843, 'count': 37125},
 {'sum': 277321, 'count': 36547},
 {'sum': 278753, 'count': 36434},
 {'sum': 275007, 'count': 36251},
 {'sum': 278495, 'count': 36595},
 {'sum': 278785, 'count': 36692},
 {'sum': 277874, 'count': 36652},
 {'sum': 278497, 'count': 36784}]

3\. Напишите функцию, которая объединяет результаты обработки отдельных файлов. Данная функция принимает на вход список словарей, каждый из которых является результатом вызова функции `get_tag_sum_count_from_file` для конкретного файла, и агрегирует эти словари. Не используйте параллельных вычислений.

Процедура агрегации словарей имеет следующий вид:
$$d_{agg}[k] = \{sum: \sum_{i=1}^{n}d_{i}[k][sum], count: \sum_{i=1}^{n}d_{i}[k][count]\}$$
где $d_1, d_2, ..., d_n$- результат вызова функции `get_tag_sum_count_from_file` для конкретных файлов.

Примените данную функцию к результату выполнения задания 2. Выведите на экран результат для тэга "30-minutes-or-less".

In [6]:
def agg_results(tag_sum_count_list: list) -> dict:
    df = pd.DataFrame(tag_sum_count_list)
    dict_agg = {tag: {'sum':0, 'count':0} for tag in list(set(df.columns))}
    for file in list_of_dicts:
        for tag in file:
            dict_agg[tag]['sum'] += file[tag]['sum']
            dict_agg[tag]['count'] += file[tag]['count'] 
    return dict_agg

In [9]:
tag_sum_count = agg_results(list_of_dicts)

In [10]:
tag_sum_count['30-minutes-or-less']

{'sum': 2783205, 'count': 365782}

4\. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга в словаре, имеющего вид, аналогичный словарям в задаче 2, и возвращает результат в виде словаря . Используйте решения задач 1-3, чтобы получить среднее значение количества шагов каждого тэга для всего датасета, имея результаты обработки частей датасета и результат их агрегации. Выведите на экран результат для тэга "30-minutes-or-less".

Определите, за какое время задача решается для всего датасета. При замере времени учитывайте время расчета статистики для каждого файла, агрегации результатов и, собственно, вычисления средного. Временем, затрачиваемым на процедуру разбиения исходного файла можно пренебречь.

In [19]:
def get_tag_mean_n_steps(tag_sum_count: dict) -> dict:
    df = pd.DataFrame(tag_sum_count).T
    df['mean'] = df['sum'] / df['count']
    df.drop(['sum','count'],axis=1,inplace=True)
    return df.T.to_dict('dict')

In [20]:
get_tag_mean_n_steps(tag_sum_count)['30-minutes-or-less']

{'mean': 7.608917333275011}

In [150]:
%%timeit
get_tag_mean_n_steps(agg_results([get_tag_sum_count_from_file(f'tag_nsteps_{i}.csv') for i in range(1,11)]))

29.8 s ± 2.58 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


5\. Повторите решение задачи 4, распараллелив вызовы функции `get_tag_sum_count_from_file` для различных файлов с помощью `multiprocessing.Pool`. Для обработки каждого файла создайте свой собственный процесс. Выведите на экран результат для тэга "30-minutes-or-less". Определите, за какое время задача решается для всех файлов. При замере времени учитывайте время расчета статистики для каждого файла, агрегации результатов и, собственно, вычисления средного. Временем, затрачиваемым на процедуру разбиения исходного файла можно пренебречь.

In [11]:
files = [f'tag_nsteps_{i}.csv' for i in range(1,11)]

In [16]:
%%file get_tag_sum_count_from_file.py
import pandas as pd
import numpy as np

def get_tag_sum_count_from_file(file: str) -> dict:
    df = pd.read_csv(file, dtype={'tags': 'str', 'n_steps': np.int64})
    df = df.groupby('tags') \
        .agg({'n_steps':'sum', 'tags':'count'}) \
        .rename(columns={'n_steps':'sum','tags':'count'})
    return df.T.to_dict('dict')

Overwriting get_tag_sum_count_from_file.py


In [17]:
from get_tag_sum_count_from_file import get_tag_sum_count_from_file

In [21]:
with multiprocessing.Pool(processes=10) as pool:
    res = pool.map(get_tag_sum_count_from_file, files)
    
get_tag_mean_n_steps(agg_results(res))['30-minutes-or-less']   

{'mean': 7.608917333275011}

In [22]:
%%timeit
with multiprocessing.Pool(processes=10) as pool:
    res = pool.map(get_tag_sum_count_from_file, files)
    
get_tag_mean_n_steps(agg_results(res)) 

8.42 s ± 146 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


6\. Повторите решение задачи 4, распараллелив вычисления функции `get_tag_sum_count_from_file` для различных файлов с помощью `multiprocessing.Process`. Для обработки каждого файла создайте свой собственный процесс. Для обмена данными между процессами используйте `multiprocessing.Queue`.

Выведите на экран результат для тэга "30-minutes-or-less". Определите, за какое время задача решается для всех файлов. При замере времени учитывайте время расчета статистики для каждого файла, агрегации результатов и, собственно, вычисления средного. Временем, затрачиваемым на процедуру разбиения исходного файла можно пренебречь.

In [23]:
%%file get_tag_sum_count_from_file_q.py
import pandas as pd
import numpy as np

def get_tag_sum_count_from_file_q(file: str, queue) -> dict:
    df = pd.read_csv(file, dtype={'tags': 'str', 'n_steps': np.int64})
    df = df.groupby('tags') \
        .agg({'n_steps':'sum', 'tags':'count'}) \
        .rename(columns={'n_steps':'sum','tags':'count'})
    return queue.put(df.T.to_dict('dict'))

Writing get_tag_sum_count_from_file_q.py


In [24]:
from get_tag_sum_count_from_file_q import get_tag_sum_count_from_file_q

In [25]:
ps = []
queue = multiprocessing.Queue()

for f in files:
    p = multiprocessing.Process(target=get_tag_sum_count_from_file_q, args=(f, queue))
    ps.append(p)
    p.start()

rs = []
while len(rs) < 10:
    if not queue.empty():
        rs.append(queue.get())
    
for p in ps:
    p.join()

get_tag_mean_n_steps(agg_results(rs))['30-minutes-or-less']   

{'mean': 7.608917333275011}

In [27]:
%%timeit
ps = []
queue = multiprocessing.Queue()

for f in files:
    p = multiprocessing.Process(target=get_tag_sum_count_from_file_q, args=(f, queue))
    ps.append(p)
    p.start()

rs = []
while len(rs) < 10:
    if not queue.empty():
        rs.append(queue.get())
    
for p in ps:
    p.join()

get_tag_mean_n_steps(agg_results(rs)) 

9.39 s ± 79.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


7\. Исследуйте, как влияет количество запущенных одновременно процессов на скорость решения задачи. Узнайте количество ядер вашего процессора $K$. Повторите решение задачи 1, разбив исходный файл на $\frac{K}{2}$, $K$ и $2K$ фрагментов. Для каждого из разбиений повторите решение задачи 5. Визуализируйте зависимость времени выполнения кода от количества файлов в разбиении. Сделайте вывод в виде текстового комментария.

8\. Напишите функцию `parallel_map`, которая принимает на вход серию `s` `pd.Series` и функцию одного аргумента `f` и поэлементно применяет эту функцию к серии, распараллелив вычисления при помощи пакета `multiprocessing`. Логика работы функции `parallel_map` должна включать следующие действия:
* разбиение исходной серии на $K$ частей, где $K$ - количество ядер вашего процессора;
* параллельное применение функции `f` к каждой части при помощи метода _серии_ `map` при помощи нескольких подпроцессов;
* объединение результатов работы подпроцессов в одну серию. 

In [None]:
def parallel_map(s: pd.Series, f: callable) -> pd.Series:
    pass

9\. Напишите функцию `f`, которая принимает на вход тэг и проверяет, удовлетворяет ли тэг следующему шаблону: `[любое число]-[любое слово]-or-less`. Возьмите любой фрагмент файла, полученный в задании 1, примените функцию `f` при помощи `parallel_map` к столбцу `tags` и посчитайте количество тэгов, подходящих под этот шаблон. Решите ту же задачу, воспользовавшись методом _серий_ `map`. Сравните время и результат выполнения двух решений.

In [None]:
def f(tag: str) -> bool:
    pass

10\. Используя пакет `pandarallel`, примените функцию `f` из задания 9 к столбцу `tags` таблицы, с которой вы работали этом задании. Посчитайте количество тэгов, подходящих под описанный шаблон. Измерьте время выполнения кода. Выведите на экран полученный результат.