# Параллельные вычисления

Материалы:
* Макрушин С.В. Лекция 10: Параллельные вычисления
* https://docs.python.org/3/library/multiprocessing.html

## Задачи для совместного разбора

1. Посчитайте, сколько раз встречается каждый из символов (заглавные и строчные символы не различаются) в файле `Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt` и в файле `Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt`. 

2. Решить задачу 1, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создать свой собственный процесс. 

## Лабораторная работа 10

1. Разбейте файл `recipes_full.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `id_tag_nsteps_*.csv`. Каждый файл содержит 3 столбца: `id`, `tag` и `n_steps`, разделенных символом `;`. Для разбора строк используйте `csv.reader`.

__Важно__: вы не можете загружать в память весь файл сразу. Посмотреть на первые несколько строк файла вы можете, написав код, который считывает эти строки.

Подсказка: примерное кол-во строк в файле - 2.3 млн.

Фрагмент одного из файлов, которые должны получиться в результате:
```
id;tag;n_steps
137739;60-minutes-or-less;11
137739;time-to-make;11
137739;course;11
```


In [16]:
import pandas as pd
import numpy as np

N = 10
size = 2_300_000 / N
data = pd.read_csv("./data/recipes_full.csv", chunksize=size)

for idx, df in enumerate(data):
    df = df[["id", "tags", "n_steps"]]
    tag_list = df["tags"].map(lambda x: x.strip("[]").replace("'", "").split(", ")).to_list()
    df = df.drop(columns=["tags"])
    df["tags"] = tag_list
    list_of_tags, list_of_ids, list_of_n_steps = [], [], []
    
    for _, row in df.iterrows():
        # переводим df в списки из элементов, чтобы собрать newdf, где будет лежать только один tag на строку
        list_of_tags.extend([tag for tag in row["tags"]])
        list_of_ids.extend([row["id"]] * len(row["tags"]))
        list_of_n_steps.extend([row["n_steps"]] * len(row["tags"]))

    newdf = pd.DataFrame(data=list(zip(list_of_ids, list_of_tags, list_of_n_steps)), columns=[["id", "tag", "n_steps"]]) 
    newdf.to_csv(f"./data/id_tag_nsteps_{idx}.csv", index=False)
    print(f"Файл №{idx} сохранен")

Файл №0 сохранен
Файл №1 сохранен
Файл №2 сохранен
Файл №3 сохранен
Файл №4 сохранен
Файл №5 сохранен
Файл №6 сохранен
Файл №7 сохранен
Файл №8 сохранен
Файл №9 сохранен


2. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает среднее значение количества шагов для каждого тэга и возвращает результат в виде словаря.

In [None]:
def average_steps(filename):
    df = pd.read_csv("./data/"+filename+".csv")
    return df.groupby(['tag']).agg('mean').to_dict()['n_steps']

d = average_steps("id_tag_nsteps_9")
print(type(d), d)

3. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга по всем файлам, полученным в задаче 1, и возвращает результат в виде словаря. Не используйте параллельных вычислений. При реализации выделите функцию, которая объединяет результаты обработки отдельных файлов. Модифицируйте код из задачи 2 таким образом, чтобы получить результат, имея результаты обработки отдельных файлов. Определите, за какое время задача решается для всех файлов.


In [3]:
from datetime import datetime

def get_all_tags():

    tags = {}
    for i in range(N):
        filename = "id_tag_nsteps_"+str(i)
        tags[filename] = average_steps(filename)
    return tags

def combine_results(data):
    temp = {}
    for _, tag in data.items():
        for key, value in tag.items():
            if key not in temp:
                temp[key] = []
            temp[key].append(value)

    combined = {}
    for k, v in temp.items():
        combined[k] = np.mean(v)
    return combined

start = datetime.now()
tag_data = get_all_tags()
result = combine_results(tag_data)
print(datetime.now() - start)
# print(result)


0:00:05.106930


4. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создайте свой собственный процесс. Определите, за какое время задача решается для всех файлов.

In [4]:
%%file task_util.py
import pandas as pd
import numpy as np


def average_steps(filename):
    df = pd.read_csv("./data/"+filename+".csv")
    return df.groupby(['tag']).agg('mean').to_dict()['n_steps']

def queue_func(input_, output):
    for filename in iter(input_.get, 'STOP'):
        result = average_steps(filename)
        output.put(result)
        
def combine_results(data):
    temp = {}
    for _, tag in data.items():
        for key, value in tag.items():
            if key not in temp:
                temp[key] = []
            temp[key].append(value)

    combined = {}
    for k, v in temp.items():
        combined[k] = np.mean(v)
    return combined

Overwriting task_util.py


In [7]:

from datetime import datetime
from multiprocessing import Pool, cpu_count, Queue, Process
import task_util
if __name__ == '__main__':
    N = 10
    start = datetime.now()
    
    filenames = ["id_tag_nsteps_"+str(i) for i in range(N)]
    
    with Pool(N) as pool:
        result_list = pool.map(task_util.average_steps, filenames)

    result = {}
    for idx, tag_data in enumerate(result_list):
        result["id_tag_nsteps_"+str(idx)] = tag_data
    
    task_util.combine_results(result)
    print(datetime.now() - start)

id_tag_nsteps_9
id_tag_nsteps_1
id_tag_nsteps_2
id_tag_nsteps_0
id_tag_nsteps_4
id_tag_nsteps_3
id_tag_nsteps_5
id_tag_nsteps_6
id_tag_nsteps_7
id_tag_nsteps_8
0:00:02.191060


5. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Создайте фиксированное количество процессов (равное половине количества ядер на компьютере). При помощи очереди `multiprocessing.queue` передайте названия файлов для обработки процессам и при помощи другой очереди заберите от них ответы. 

In [None]:
if __name__ == '__main__':

    count = cpu_count() // 4
    files_queue = Queue()
    done_queue = Queue()

    filenames = ["id_tag_nsteps_"+str(i) for i in range(N)]

    for file in filenames:
        files_queue.put(file)

    for i in range(count):
        Process(target=task_util.queue_func, args=(files_queue, done_queue)).start()

    result = {}
    for idx, tag_data in enumerate(result_list):
        result["id_tag_nsteps_"+str(idx)] = done_queue.get()

    for i in range(count):
        files_queue.put('STOP')

    print("Кол-во файлов:", len(result))
    result = task_util.combine_results(result)
    # print(result)
