# Параллельные вычисления

Материалы:
* Макрушин С.В. Лекция 10: Параллельные вычисления
* https://docs.python.org/3/library/multiprocessing.html

## Задачи для совместного разбора

1. Посчитайте, сколько раз встречается каждый из символов (заглавные и строчные символы не различаются) в файле `Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt` и в файле `Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt`. 

In [None]:
with open('Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt', 'r', encoding="windows-1251") as f:
  text = f.read().lower()

with open('Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt', 'r', encoding="windows-1251") as f:
  text +=  f.read().lower()

In [1]:
from collections import defaultdict

In [None]:
%%time

d = defaultdict(int)
def counter(key):
  d[key] += 1

for key in text:
  counter(key)

CPU times: user 457 ms, sys: 0 ns, total: 457 ms
Wall time: 462 ms


In [None]:
list(d.items())[20:25]

[('b', 245), ('o', 481), ('k', 37), ('s', 525), ('c', 366)]

In [2]:
import numpy

In [None]:
%%time

unique, counts = numpy.unique(list(text), return_counts=True)
d = dict(zip(unique, counts))

CPU times: user 359 ms, sys: 46.5 ms, total: 406 ms
Wall time: 404 ms


In [None]:
list(d.items())[20:25]

[('7', 163), ('8', 335), ('9', 136), (':', 1196), (';', 1728)]

2. Решить задачу 1, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создать свой собственный процесс. 

In [3]:
import multiprocessing

In [None]:
len(text)//11
# Все делители числа 1 484 659:
# 1, 11,  139,  971, 1 529, 10 681,  134 969, 1 484 659

134969

In [None]:
%%time

n_process = 11

def process_helper(_text):
  d = defaultdict(int)
  def counter(key):
    d[key] += 1
  for i in range(0, len(_text)):
    counter(text[i+j])
  return d

list_text = [text[j*len(text)//n_process:(j+1)*len(text)//n_process] for j in range(n_process)]
pool_obj = multiprocessing.Pool()
answer = pool_obj.map(process_helper, list_text)

def join_dicts(x, y):
  return {k: x.get(k, 0) + y.get(k, 0) for k in set(x) | set(y)}

d = {}
for _d in answer:
  d = join_dicts(_d, d)

CPU times: user 31.9 ms, sys: 69.9 ms, total: 102 ms
Wall time: 632 ms


In [None]:
list(d.items())[20:25]

[('z', 11), ('л', 52316), (',', 28677), (']', 121), ('k', 88)]

## Лабораторная работа 10

In [12]:
import requests

url = 'https://docs.yandex.ru/docs/view?url=ya-disk-public%3A%2F%2FspiubVVFcS0tgFcQE%2F0JPqxqVoZjKwEzANnNTYhpkB2%2BkuBZOqijCDk1CpASYCl6q%2FJ6bpmRyOJonT3VoXnDag%3D%3D%3A%2Fdatasets%2Frecipes_full.csv&name=recipes_full.csv&nosw=1'
r = requests.get(url, allow_redirects=True)
open('./data/recipes_full.csv', 'wb').write(r.content)

18126

1. Разбейте файл `recipes_full.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `id_tag_nsteps_*.csv`. Каждый файл содержит 3 столбца: `id`, `tag` и `n_steps`, разделенных символом `;`. Для разбора строк используйте `csv.reader`.

__Важно__: вы не можете загружать в память весь файл сразу. Посмотреть на первые несколько строк файла вы можете, написав код, который считывает эти строки.

Подсказка: примерное кол-во строк в файле - 2.3 млн.

```
id;tag;n_steps
137739;60-minutes-or-less;11
137739;time-to-make;11
137739;course;11
```


In [15]:
import csv

n_f_ws = 8
f_ws = [f'id_tag_nsteps_{i}.csv' for i in range(n_f_ws)]
with open('./data/recipes_full.csv') as f:  
    reader = csv.DictReader(f)
    for n, row in enumerate(reader):
        f_w = f_ws[n%n_f_ws]
        r = [row['id'], row['tags'], row['n_steps']]
        with open(f_w, 'a+', newline='') as w:
            csv.writer(w).writerow(r)

KeyError: 'id'

2. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает среднее значение количества шагов для каждого тэга и возвращает результат в виде словаря.

In [11]:
from collections import defaultdict
import ast

def get_mean_steps_by_tag(fname):
  tags_and_steps = defaultdict(list)
  with open(fname) as f:  
    reader = csv.reader(f)
    for row in reader:
      tags = ast.literal_eval(row[1])
      steps = int(row[2])
      for tag in tags:
        tags_and_steps[tag].append(steps)
  del tags_and_steps['']
  tags_and_steps = dict(zip(
    list(tags_and_steps.keys()), 
    list(map(lambda x: sum(tags_and_steps[x])/len(tags_and_steps[x]), tags_and_steps))
  ))
  return tags_and_steps

get_mean_steps_by_tag('id_tag_nsteps_0.csv')

{'mexican': 4.5,
 'healthy-2': 6.0,
 'orange-roughy': 4.0,
 'chicken-thighs-legs': 5.0,
 'freezer': 6.666666666666667,
 'whitefish': 4.0,
 'pork-sausage': 4.0,
 'filipino': 4.0,
 'for-large-groups': 7.0,
 'pasta-salad': 4.0,
 'rosh-hashana': 4.75,
 'cambodian': 5.5,
 'pasta': 6.75,
 'fruit': 4.0,
 'cabbage': 6.0,
 'grains': 6.0,
 'equipment': 9.375,
 'lime': 5.0,
 'low-sodium': 5.333333333333333,
 'bass': 5.0,
 'meatballs': 3.0,
 'veal': 3.0,
 'prepared-potatoes': 3.5,
 'oaxacan': 3.5,
 'collard-greens': 3.5,
 'pheasant': 6.0,
 'fudge': 6.0,
 'micro-melanesia': 6.0,
 'cajun': 5.0,
 'breakfast-eggs': 6.0,
 'honduran': 5.0,
 'halloween-cocktails': 5.0,
 'beef': 5.8,
 'irish': 3.5,
 'turkey': 8.333333333333334,
 'Throw the ultimate fiesta with this sopaipillas recipe from Food.com.': 3.3333333333333335,
 'gluten-free': 3.6666666666666665,
 'pickeral': 3.3333333333333335,
 'south-american': 4.0,
 'black-bean-soup': 2.0,
 'pasta-rice-and-grains': 5.75,
 'food-processor-blender': 2.0,
 'brit

3. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга по всем файлам, полученным в задаче 1, и возвращает результат в виде словаря. Не используйте параллельных вычислений. При реализации выделите функцию, которая объединяет результаты обработки отдельных файлов. Модифицируйте код из задачи 2 таким образом, чтобы иметь возможность получить результат, имея результаты обработки отдельных файлов. Определите, за какое время задача решается для всех файлов.


In [None]:
%%time

from collections import defaultdict
import ast

def get_steps_and_tag(fname):
  print('Файл в процессе:', fname)
  tags_and_steps = defaultdict(list)
  with open(fname) as f:  
    reader = csv.reader(f)
    for row in reader:
      tags = ast.literal_eval(row[1])
      steps = int(row[2])
      for tag in tags:
        tags_and_steps[tag].append(steps)
  del tags_and_steps['']
  print('Файл  обработан:', fname)
  return tags_and_steps

def disp_all_files(n=8):
  tags_and_steps = defaultdict(list)
  for i in range(n):
    cur = get_steps_and_tag(f'id_tag_nsteps_{i}.csv')
    for key, value in cur.items():
      tags_and_steps[key] += value
  tags_and_steps = dict(zip(
    list(tags_and_steps.keys()), 
    list(map(lambda x: sum(tags_and_steps[x])/len(tags_and_steps[x]), tags_and_steps))
  ))
  return tags_and_steps

print(list(disp_all_files().items())[:10])

Файл в процессе: id_tag_nsteps_0.csv
Файл  обработан: id_tag_nsteps_0.csv
Файл в процессе: id_tag_nsteps_1.csv
Файл  обработан: id_tag_nsteps_1.csv
Файл в процессе: id_tag_nsteps_2.csv
Файл  обработан: id_tag_nsteps_2.csv
Файл в процессе: id_tag_nsteps_3.csv
Файл  обработан: id_tag_nsteps_3.csv
Файл в процессе: id_tag_nsteps_4.csv
Файл  обработан: id_tag_nsteps_4.csv
Файл в процессе: id_tag_nsteps_5.csv
Файл  обработан: id_tag_nsteps_5.csv
Файл в процессе: id_tag_nsteps_6.csv
Файл  обработан: id_tag_nsteps_6.csv
Файл в процессе: id_tag_nsteps_7.csv
Файл  обработан: id_tag_nsteps_7.csv
[('mexican', 5.302344316442439), ('healthy-2', 6.384162244806188), ('orange-roughy', 3.513425052701653), ('chicken-thighs-legs', 4.145581465931509), ('freezer', 4.033042234819468), ('whitefish', 3.514734127201888), ('pork-sausage', 4.256068444090729), ('filipino', 3.575355140695586), ('for-large-groups', 7.292883853009813), ('pasta-salad', 3.5048206710374084)]
CPU times: user 50.2 s, sys: 1.26 s, total: 5

4. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создайте свой собственный процесс. Определите, за какое время задача решается для всех файлов.

In [None]:
%%time

from collections import defaultdict
import ast
import multiprocessing

def get_steps_and_tag(fname):
  print('Файл в процессе:', fname)
  tags_and_steps = defaultdict(list)
  with open(fname) as f:  
    reader = csv.reader(f)
    for row in reader:
      tags = ast.literal_eval(row[1])
      steps = int(row[2])
      for tag in tags:
        tags_and_steps[tag].append(steps)
  del tags_and_steps['']
  print('Файл  обработан:', fname)
  return tags_and_steps

def disp_all_files(n=8, proc_n=2):
  tags_and_steps = defaultdict(list)
  fnames = [f'id_tag_nsteps_{i}.csv' for i in range(n)]
  all = multiprocessing.Pool(processes=proc_n).map(get_steps_and_tag, fnames)
  for cur in all:
    for key, value in cur.items():
      tags_and_steps[key] += value
  tags_and_steps = dict(zip(
    list(tags_and_steps.keys()), 
    list(map(lambda x: sum(tags_and_steps[x])/len(tags_and_steps[x]), tags_and_steps))
  ))
  return tags_and_steps

print(list(disp_all_files().items())[:10])

Файл в процессе: id_tag_nsteps_0.csv
Файл в процессе: id_tag_nsteps_1.csv
Файл  обработан: id_tag_nsteps_1.csv
Файл  обработан: id_tag_nsteps_0.csv
Файл в процессе: id_tag_nsteps_2.csv
Файл в процессе: id_tag_nsteps_3.csv
Файл  обработан: id_tag_nsteps_2.csv
Файл в процессе: id_tag_nsteps_4.csv
Файл  обработан: id_tag_nsteps_3.csv
Файл в процессе: id_tag_nsteps_5.csv
Файл  обработан: id_tag_nsteps_4.csv
Файл в процессе: id_tag_nsteps_6.csv
Файл  обработан: id_tag_nsteps_5.csv
Файл в процессе: id_tag_nsteps_7.csv
Файл  обработан: id_tag_nsteps_6.csv
Файл  обработан: id_tag_nsteps_7.csv
[('mexican', 5.302344316442439), ('healthy-2', 6.384162244806188), ('orange-roughy', 3.513425052701653), ('chicken-thighs-legs', 4.145581465931509), ('freezer', 4.033042234819468), ('whitefish', 3.514734127201888), ('pork-sausage', 4.256068444090729), ('filipino', 3.575355140695586), ('for-large-groups', 7.292883853009813), ('pasta-salad', 3.5048206710374084)]
CPU times: user 1.26 s, sys: 992 ms, total: 2

5. (*) Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Создайте фиксированное количество процессов (равное половине количества ядер на компьютере). При помощи очереди передайте названия файлов для обработки процессам и при помощи другой очереди заберите от них ответы. 

In [None]:
%%time

from collections import defaultdict
import ast
import multiprocessing

def get_steps_and_tag(q_in, q_out):
  while not q_in.empty():
    print('Размер  очереди:', q_in.qsize())
    fname = q_in.get()
    print('Файл в процессе:', fname)
    tags_and_steps = defaultdict(list)
    with open(fname) as f:  
      reader = csv.reader(f)
      n = 0
      for row in reader:
        tags = ast.literal_eval(row[1])
        steps = int(row[2])
        for tag in tags:
          tags_and_steps[tag].append(steps)
    del tags_and_steps['']
    print('Файл  обработан:', fname)
    q_out.put(tags_and_steps)
  return True

def disp_all_files(n=8, proc_n=2):
  tags_and_steps = defaultdict(list)
  q_out = multiprocessing.Queue()
  q_in = multiprocessing.Queue()
  fnames = [f'id_tag_nsteps_{i}.csv' for i in range(n)]
  _ = [q_in.put(fname) for fname in fnames]
  ps = [multiprocessing.Process(target=get_steps_and_tag, args=(q_in, q_out,)) for _ in range(proc_n)]
  _ = [p.start() for p in ps]
  all = [q_out.get() for _ in fnames]
  for cur in all:
    for key, value in cur.items():
      tags_and_steps[key] += value
  tags_and_steps = dict(zip(
    list(tags_and_steps.keys()), 
    list(map(lambda x: sum(tags_and_steps[x])/len(tags_and_steps[x]), tags_and_steps))
  ))
  return tags_and_steps

print(list(disp_all_files().items())[:10])

Размер  очереди: 8
Файл в процессе: id_tag_nsteps_0.csv
Размер  очереди: 7
Файл в процессе: id_tag_nsteps_1.csv
Файл  обработан: id_tag_nsteps_1.csv
Размер  очереди: 6
Файл в процессе: id_tag_nsteps_2.csv
Файл  обработан: id_tag_nsteps_0.csv
Размер  очереди: 5
Файл в процессе: id_tag_nsteps_3.csv
Файл  обработан: id_tag_nsteps_2.csv
Размер  очереди: 4
Файл в процессе: id_tag_nsteps_4.csv
Файл  обработан: id_tag_nsteps_3.csv
Размер  очереди: 3
Файл в процессе: id_tag_nsteps_5.csv


In [None]:
import time

def he(q_in, q_out):
  while not q_in.empty():
    print('Размер  очереди:', q_in.qsize())
    time.sleep(1)
    fname = q_in.get()
    q_out.put(fname*2)
  return True


q_out = multiprocessing.Queue()
q_in = multiprocessing.Queue()
fnames = [1,2,3]
_ = [q_in.put(fname) for fname in fnames]
ps = [multiprocessing.Process(target=he, args=(q_in, q_out,)) for _ in range(2)]
for p in ps:
  p.daemon = True
_ = [p.start() for p in ps]
# _ = [p.join() for p in ps]
all = [q_out.get() for _ in fnames]
print(all)

Размер  очереди: 3
Размер  очереди: 3
Размер  очереди: 2
Размер  очереди: 1
[2, 4, 6]
