# Параллельные вычисления

Материалы:
* Макрушин С.В. Лекция 10: Параллельные вычисления
* https://docs.python.org/3/library/multiprocessing.html

## Задачи для совместного разбора

1. Посчитайте, сколько раз встречается каждый из символов (заглавные и строчные символы не различаются) в файле `Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt` и в файле `Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt`. 

In [15]:
import multiprocessing as mp
import csv
import pandas as pd

In [8]:
%%file let_count.py
from collections import Counter

def let_count(file):
    
    with open(file) as fp:
        text = fp.read().lower()
        return Counter(text)

Overwriting let_count.py


In [3]:
let_count("./data/Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt")

Counter({'с': 50084,
         'п': 25652,
         'а': 73555,
         'и': 62030,
         'б': 16016,
         'о': 106740,
         ',': 26973,
         ' ': 182305,
         'ч': 16492,
         'т': 59813,
         'к': 30802,
         'л': 42328,
         'н': 60920,
         'г': 16174,
         'у': 27309,
         'в': 43700,
         'е': 80972,
         'й': 9747,
         'э': 3203,
         'р': 39784,
         'b': 25,
         'o': 104,
         'k': 16,
         's': 96,
         'c': 42,
         'a': 98,
         'f': 23,
         'e': 162,
         '.': 9864,
         'n': 114,
         't': 98,
         ':': 984,
         'h': 48,
         'p': 29,
         '/': 22,
         '\n': 8583,
         'u': 86,
         'r': 76,
         'd': 38,
         'v': 65,
         'i': 235,
         'y': 5,
         '_': 8,
         '-': 3558,
         '1': 384,
         '0': 110,
         '9': 100,
         '6': 271,
         'm': 54,
         'l': 46,
         'ж': 10552,
     

In [13]:
let_count("./data/Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt")

Counter({'с': 11507,
         'п': 5489,
         'а': 18236,
         'и': 13587,
         'б': 3980,
         'о': 23130,
         ',': 6372,
         ' ': 45076,
         'ч': 4113,
         'т': 14245,
         'к': 6744,
         'л': 9961,
         'н': 14240,
         'г': 3948,
         'у': 6044,
         'в': 9398,
         'е': 20054,
         'й': 2028,
         'э': 836,
         'р': 9482,
         'b': 220,
         'o': 377,
         'k': 21,
         's': 429,
         'c': 324,
         'a': 590,
         'f': 52,
         'e': 1200,
         '.': 2954,
         'n': 459,
         't': 332,
         ':': 212,
         'h': 227,
         'p': 100,
         '/': 20,
         '\n': 2734,
         'u': 285,
         'r': 308,
         'd': 192,
         'v': 87,
         'i': 369,
         'y': 8,
         '_': 4,
         '-': 900,
         '1': 46,
         '0': 22,
         '9': 36,
         '6': 42,
         'm': 401,
         'l': 571,
         'ж': 2297,
         'д

2. Решить задачу 1, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создать свой собственный процесс. 

In [9]:
from let_count import let_count

In [23]:
if __name__ == "__main__":
    files = ["./data/Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt", "./data/Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt"]
    pool = mp.Pool(processes=4)
    counters = pool.map(let_count, files)

In [24]:
counters

[Counter({'с': 50084,
          'п': 25652,
          'а': 73555,
          'и': 62030,
          'б': 16016,
          'о': 106740,
          ',': 26973,
          ' ': 182305,
          'ч': 16492,
          'т': 59813,
          'к': 30802,
          'л': 42328,
          'н': 60920,
          'г': 16174,
          'у': 27309,
          'в': 43700,
          'е': 80972,
          'й': 9747,
          'э': 3203,
          'р': 39784,
          'b': 25,
          'o': 104,
          'k': 16,
          's': 96,
          'c': 42,
          'a': 98,
          'f': 23,
          'e': 162,
          '.': 9864,
          'n': 114,
          't': 98,
          ':': 984,
          'h': 48,
          'p': 29,
          '/': 22,
          '\n': 8583,
          'u': 86,
          'r': 76,
          'd': 38,
          'v': 65,
          'i': 235,
          'y': 5,
          '_': 8,
          '-': 3558,
          '1': 384,
          '0': 110,
          '9': 100,
          '6': 271,
          'm':

## Лабораторная работа 10

1. Разбейте файл `recipes_full.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `id_tag_nsteps_*.csv`. Каждый файл содержит 3 столбца: `id`, `tag` и `n_steps`, разделенных символом `;`. Для разбора строк используйте `csv.reader`.

__Важно__: вы не можете загружать в память весь файл сразу. Посмотреть на первые несколько строк файла вы можете, написав код, который считывает эти строки.

Подсказка: примерное кол-во строк в файле - 2.3 млн.

Фрагмент одного из файлов, которые должны получиться в результате:
```
id;tag;n_steps
137739;60-minutes-or-less;11
137739;time-to-make;11
137739;course;11
```


In [32]:
with open('./data/recipes_full.csv', newline='', encoding="utf-8") as file:
    recipes = csv.DictReader(file)

    count = 0
    file_number = 1
    file_write = open('./output/id_tag_nsteps_' + str(file_number) + '.csv', 'w', encoding="utf-8")
    writer = csv.writer(file_write,  delimiter = ";", lineterminator="\r")   
    for row in recipes:
        count += 1
        if count < 287360:
            if count == 1:
                writer.writerow(["id", "tags", "n_steps"])
            lst = row["tags"][1:-1].split(", ")
            tag_lst = [i[1:-1] for i in lst]
            for tag in tag_lst:
                writer.writerow([row["id"], tag, row["n_steps"]])
        else:
            count = 0
            file_number += 1
            file_write.close()
            file_write = open('./output/id_tag_nsteps_' + str(file_number) + '.csv', 'w', encoding="utf-8")
            writer = csv.writer(file_write,  delimiter = ";", lineterminator="\r")   


2. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает среднее значение количества шагов для каждого тэга и возвращает результат в виде словаря.

In [40]:
%%file mean_step_file.py
import pandas as pd

def mean_step(file):
    tags_dict = {}
    df = pd.read_csv(file, sep=";")
    df_tags = df.groupby('tags').mean()["n_steps"]
    df = df.dropna()
    df_tags = df_tags.dropna()
    for tag in df["tags"]:
        if tag not in tags_dict:
            tags_dict[tag] = df_tags[tag]
    return tags_dict

Writing mean_step_file.py


In [34]:
tags_dict = mean_step('./output/id_tag_nsteps_1.csv')
tags_dict

{'mexican': 5.310914454277286,
 'healthy-2': 6.334017094017094,
 'orange-roughy': 3.4486692015209126,
 'chicken-thighs-legs': 4.181159420289855,
 'freezer': 3.9193485847227607,
 'whitefish': 3.525452250736222,
 'pork-sausage': 4.298745724059293,
 'brunch': 6.907961246840775,
 'ham-and-bean-soup': 3.5179180887372015,
 'colombian': 3.5746367239101717,
 'savory-pies': 4.292712066905615,
 'refrigerator': 4.749388325760224,
 'australian': 4.247299813780261,
 'served-cold': 4.941708229426434,
 'spaghetti': 4.130796670630202,
 'passover': 3.629370629370629,
 'quick-breads': 4.964539007092198,
 'californian': 3.7518218623481783,
 'namibian': 3.5014861995753717,
 'candy': 4.293940534437335,
 'independence-day': 4.14301717699776,
 'baking': 3.5878839590443685,
 'pennsylvania-dutch': 3.538971807628524,
 'weeknight': 7.436413361984763,
 '60-minutes-or-less': 9.321623731459797,
 'time-to-make': 9.247789003241746,
 'course': 9.237364209978534,
 'cuisine': 9.127411772960079,
 'preparation': 9.2897949

3. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга по всем файлам, полученным в задаче 1, и возвращает результат в виде словаря. Не используйте параллельных вычислений. При реализации выделите функцию, которая объединяет результаты обработки отдельных файлов. Модифицируйте код из задачи 2 таким образом, чтобы получить результат, имея результаты обработки отдельных файлов. Определите, за какое время задача решается для всех файлов.


In [16]:
def merge_df(file, prev_df = pd.DataFrame()):
    df = pd.read_csv(file, sep=";")
    df = df.append(prev_df)
    return df

In [17]:
def mean_step_df(df):
    tags_dict = {}
    df_tags = df.groupby('tags').mean()["n_steps"]
    df = df.dropna()
    df_tags = df_tags.dropna()
    for tag in df["tags"]:
        if tag not in tags_dict:
            tags_dict[tag] = df_tags[tag]
    return tags_dict

In [18]:
%%time
df = merge_df('./output/id_tag_nsteps_1.csv')
for i in range(2, 9):
    df = merge_df('./output/id_tag_nsteps_' + str(i) + '.csv', df)
tags_dict = mean_step_df(df)
tags_dict

Wall time: 14.1 s


{'middle-eastern': 4.191139303236038,
 'lamb-sheep': 4.130016786204791,
 'granola-and-porridge': 3.7370038545681843,
 'south-african': 3.6286164034087824,
 'pork-ribs': 3.711184495452577,
 'chilean': 3.5288842429577465,
 'greek': 4.349770081205362,
 'low-sodium': 7.3863798933541425,
 'yams-sweet-potatoes': 4.005262620069487,
 'roast': 4.020509341404864,
 'herb-and-spice-mixes': 3.552204547880858,
 'jellies': 3.5721814724259713,
 'salads': 5.062135863099777,
 'italian': 5.8739571150097465,
 '5-ingredients-or-less': 5.345920719916704,
 'preparation': 9.293416400880012,
 'to-go': 6.433847655720368,
 'bisques-cream-soups': 4.132540995938017,
 'shakes': 3.525634933206611,
 'south-american': 3.941121832911915,
 'welsh': 3.5899640796778054,
 'poultry': 7.673893983554892,
 'low-carb': 7.247922207446808,
 '30-minutes-or-less': 7.610132158590308,
 'time-to-make': 9.27850778138217,
 'course': 9.274686706181202,
 'main-ingredient': 9.315210297605468,
 'cuisine': 9.1700252580716,
 'occasion': 9.136

4. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создайте свой собственный процесс. Определите, за какое время задача решается для всех файлов.

In [35]:
%%file create_df_file.py
import pandas as pd
def create_df(file):
    df = pd.read_csv(file, sep=";")
    return df

Writing create_df_file.py


In [44]:
%%time
#более точное, но менее эффективное решение
import create_df_file
if __name__ == "__main__":
    files = ['./output/id_tag_nsteps_' + str(i) + '.csv' for i in range(1, 9)]
    pool = mp.Pool(processes=len(files))
    df_lst = pool.map(create_df_file.create_df, files)

tags_dict = mean_step_df(pd.concat(df_lst))
tags_dict

Wall time: 10.9 s


{'mexican': 5.302344316442439,
 'healthy-2': 6.384162244806188,
 'orange-roughy': 3.513425052701653,
 'chicken-thighs-legs': 4.145581465931509,
 'freezer': 4.033042234819468,
 'whitefish': 3.514734127201888,
 'pork-sausage': 4.256068444090729,
 'brunch': 6.871661962657403,
 'ham-and-bean-soup': 3.508423254789694,
 'colombian': 3.5359842260926717,
 'savory-pies': 4.298328243879716,
 'refrigerator': 4.702350782137551,
 'australian': 4.218603314493725,
 'served-cold': 4.911621189054144,
 'spaghetti': 4.0825152293208475,
 'passover': 3.658676110051757,
 'quick-breads': 5.059023265562775,
 'californian': 3.74143203627544,
 'namibian': 3.5042895887529752,
 'candy': 4.229612689762553,
 'independence-day': 4.10637159533074,
 'baking': 3.6306821245618766,
 'pennsylvania-dutch': 3.5471966710468683,
 'weeknight': 7.413649806241077,
 '60-minutes-or-less': 9.413654300607185,
 'time-to-make': 9.27850778138217,
 'course': 9.274686706181202,
 'cuisine': 9.1700252580716,
 'preparation': 9.2934164008800

In [43]:
%%time
#менее точное, но более эффективное решение
import mean_step_file
if __name__ == "__main__":
    files = ['./output/id_tag_nsteps_' + str(i) + '.csv' for i in range(1, 9)]
    pool = mp.Pool(processes=len(files))
    dict_lst = pool.map(mean_step_file.mean_step, files)

answer_dict = dict_lst[0]

for item in dict_lst[1:]:
    answer_dict = {**answer_dict, **item}
answer_dict

Wall time: 5.38 s


{'mexican': 5.237799834574028,
 'healthy-2': 6.4895543175487465,
 'orange-roughy': 3.4735955056179777,
 'chicken-thighs-legs': 4.114686040477426,
 'freezer': 4.025737265415549,
 'whitefish': 3.563073394495413,
 'pork-sausage': 4.4013464526152255,
 'brunch': 6.8151685393258425,
 'ham-and-bean-soup': 3.4655847789591494,
 'colombian': 3.513454146073586,
 'savory-pies': 4.320610687022901,
 'refrigerator': 4.708088567555355,
 'australian': 4.2594224180127265,
 'served-cold': 4.973520249221184,
 'spaghetti': 4.14344262295082,
 'passover': 3.6161504424778763,
 'quick-breads': 5.207394815129621,
 'californian': 3.7439712673165726,
 'namibian': 3.5486425339366514,
 'candy': 4.237891737891738,
 'independence-day': 4.0960614793467816,
 'baking': 3.629139072847682,
 'pennsylvania-dutch': 3.603139013452915,
 'weeknight': 7.309997416688194,
 '60-minutes-or-less': 9.454514207149405,
 'time-to-make': 9.286383210000418,
 'course': 9.288817065287654,
 'cuisine': 9.219791472723887,
 'preparation': 9.2988

5. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Создайте фиксированное количество процессов (равное половине количества ядер на компьютере). При помощи очереди `multiprocessing.queue` передайте названия файлов для обработки процессам и при помощи другой очереди заберите от них ответы. 

In [4]:
%%file queue_file.py

import pandas as pd
from multiprocessing import Process, Queue, current_process, freeze_support

def worker(input_, output):
    for args in iter(input_.get, 'STOP'):
        result = mean_step(args)
        output.put(result)


def mean_step(file):
    tags_dict = {}
    df = pd.read_csv(file, sep=";")
    df_tags = df.groupby('tags').mean()["n_steps"]
    df = df.dropna()
    df_tags = df_tags.dropna()
    for tag in df["tags"]:
        if tag not in tags_dict:
            tags_dict[tag] = df_tags[tag]
    return tags_dict

Overwriting queue_file.py


In [13]:
%%time
import queue_file
from multiprocessing import Process, Queue, current_process, freeze_support

def mean_count():
    NUMBER_OF_PROCESSES = 2
    files = ['./output/id_tag_nsteps_' + str(i) + '.csv' for i in range(1, 9)]

    task_queue = Queue()
    done_queue = Queue()

    for task in files:
        task_queue.put(task)

    for i in range(NUMBER_OF_PROCESSES):
        Process(target=queue_file.worker, args=(task_queue, done_queue)).start()

    answer_dict = done_queue.get()
    for i in range(1, len(files)):
        answer_dict = {**answer_dict, **done_queue.get()}

    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')
        
    return answer_dict

freeze_support()

answer_dict = mean_count()
answer_dict

Wall time: 5.93 s


{'brewing': 3.5512489233419466,
 'savory-pies': 4.33929331630481,
 'italian': 5.731601731601732,
 'romantic': 5.115137148662377,
 'clear-soups': 3.668134507606085,
 'french': 4.5325329202168865,
 'long-grain-rice': 3.965784114052953,
 'pacific-northwest': 3.7850895460224905,
 'muffins': 4.350530552506403,
 'chutneys': 3.4971994829814737,
 'beef-ribs': 3.6419437340153453,
 'palestinian': 3.4731273804485823,
 'gifts': 5.62914511712808,
 'stews': 4.179968701095461,
 'cherries': 3.680960548885077,
 'chicken-breasts': 5.773991031390135,
 'mushroom-soup': 3.4742630884293884,
 'kiwifruit': 3.508720930232558,
 'freshwater-fish': 3.848278622898319,
 'prepared-potatoes': 3.5017421602787455,
 'high-fiber': 3.5095785440613025,
 'ontario': 3.7576,
 'roast-beef-main-dish': 3.486706689536878,
 'greens': 4.6670182841068915,
 'holiday-event': 8.438943894389439,
 'creole': 3.7266881028938905,
 'bananas': 4.009750390015601,
 'garnishes': 3.5953684210526315,
 'tomatoes': 4.867651782846129,
 'raspberries':